• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3565

25 Dec 2024 05:34AM UTC coverage: 51.098% (-11.1%) from 62.21%
#3565

push

travis-ci

web-flow
Merge pull request #29316 from taosdata/enh/3.0/TD-33266

enh(ut):Add wal & config UT.

111558 of 284773 branches covered (39.17%)

Branch coverage included in aggregate %.

1 of 2 new or added lines in 2 files covered. (50.0%)

39015 existing lines in 102 files now uncovered.

177882 of 281666 relevant lines covered (63.15%)

15090998.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.64
/source/dnode/vnode/src/vnd/vnodeSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "tq.h"
19
#include "tqCommon.h"
20
#include "tsdb.h"
21
#include "vnd.h"
22

23
#define BATCH_ENABLE 0
24

25
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
40✔
26

UNCOV
27
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
×
UNCOV
28
  const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
29
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
×
30
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
UNCOV
31
  if (tsem_wait(&pVnode->syncSem) != 0) {
×
32
    vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
×
33
  }
UNCOV
34
}
×
35

36
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
18✔
37
  if (vnodeIsMsgBlock(pMsg->msgType)) {
18!
UNCOV
38
    const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
39
    (void)taosThreadMutexLock(&pVnode->lock);
×
UNCOV
40
    if (pVnode->blocked) {
×
UNCOV
41
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
×
42
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
UNCOV
43
      pVnode->blocked = false;
×
UNCOV
44
      pVnode->blockSec = 0;
×
UNCOV
45
      pVnode->blockSeq = 0;
×
UNCOV
46
      if (tsem_post(&pVnode->syncSem) != 0) {
×
47
        vError("vgId:%d, failed to post sem", pVnode->config.vgId);
×
48
      }
49
    }
UNCOV
50
    (void)taosThreadMutexUnlock(&pVnode->lock);
×
51
  }
52
}
18✔
53

UNCOV
54
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
×
UNCOV
55
  SEpSet newEpSet = {0};
×
UNCOV
56
  syncGetRetryEpSet(pVnode->sync, &newEpSet);
×
57

UNCOV
58
  const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
59
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
×
60
          newEpSet.numOfEps, newEpSet.inUse);
UNCOV
61
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
×
UNCOV
62
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
×
63
            newEpSet.eps[i].port);
64
  }
UNCOV
65
  pMsg->info.hasEpSet = 1;
×
66

UNCOV
67
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;
×
68

UNCOV
69
  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
×
UNCOV
70
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);
×
71

UNCOV
72
  rsp.pCont = rpcMallocCont(contLen);
×
UNCOV
73
  if (rsp.pCont == NULL) {
×
74
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
75
  } else {
UNCOV
76
    if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
×
77
      vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
×
78
    }
UNCOV
79
    rsp.contLen = contLen;
×
80
  }
81

UNCOV
82
  tmsgSendRsp(&rsp);
×
UNCOV
83
}
×
84

85
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
40✔
86
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
40✔
87
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
40!
UNCOV
88
    rsp.code = terrno;
×
UNCOV
89
    const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
90
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
×
91
  }
92
  if (rsp.info.handle != NULL) {
40✔
93
    tmsgSendRsp(&rsp);
38✔
94
  } else {
95
    if (rsp.pCont) {
2!
UNCOV
96
      rpcFreeCont(rsp.pCont);
×
97
    }
98
  }
99
}
40✔
100

UNCOV
101
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
×
UNCOV
102
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
×
UNCOV
103
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
×
UNCOV
104
  } else if (code == TSDB_CODE_MSG_PREPROCESSED) {
×
UNCOV
105
    SRpcMsg rsp = {.code = TSDB_CODE_SUCCESS, .info = pMsg->info};
×
UNCOV
106
    if (rsp.info.handle != NULL) {
×
UNCOV
107
      tmsgSendRsp(&rsp);
×
108
    }
109
  } else {
UNCOV
110
    const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
111
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
×
UNCOV
112
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
×
UNCOV
113
    if (rsp.info.handle != NULL) {
×
UNCOV
114
      tmsgSendRsp(&rsp);
×
115
    }
116
  }
UNCOV
117
}
×
118

119
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
40✔
120
  int64_t seq = 0;
40✔
121

122
  (void)taosThreadMutexLock(&pVnode->lock);
40✔
123
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
40✔
124
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
40!
125
  if (wait) {
40!
UNCOV
126
    if (pVnode->blocked) {
×
127
      return TSDB_CODE_INTERNAL_ERROR;
×
128
    }
UNCOV
129
    pVnode->blocked = true;
×
UNCOV
130
    pVnode->blockSec = taosGetTimestampSec();
×
UNCOV
131
    pVnode->blockSeq = seq;
×
132
  }
133
  (void)taosThreadMutexUnlock(&pVnode->lock);
40✔
134

135
  if (code > 0) {
40!
136
    vnodeHandleWriteMsg(pVnode, pMsg);
40✔
UNCOV
137
  } else if (code < 0) {
×
UNCOV
138
    if (terrno != 0) code = terrno;
×
UNCOV
139
    vnodeHandleProposeError(pVnode, pMsg, code);
×
140
  }
141

142
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
40!
143
  return code;
40✔
144
}
145

146
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
58✔
147
  if (!vnodeShouldCommit(pVnode, atExit)) {
58✔
148
    return;
40✔
149
  }
150

151
  int32_t   contLen = sizeof(SMsgHead);
18✔
152
  SMsgHead *pHead = rpcMallocCont(contLen);
18✔
153
  pHead->contLen = contLen;
18✔
154
  pHead->vgId = pVnode->config.vgId;
18✔
155

156
  SRpcMsg rpcMsg = {0};
18✔
157
  rpcMsg.msgType = TDMT_VND_COMMIT;
18✔
158
  rpcMsg.contLen = contLen;
18✔
159
  rpcMsg.pCont = pHead;
18✔
160
  rpcMsg.info.noResp = 1;
18✔
161

162
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
18!
163
  bool isWeak = false;
18✔
164

165
  if (!atExit) {
18!
UNCOV
166
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
×
167
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
×
168
    }
UNCOV
169
    rpcFreeCont(rpcMsg.pCont);
×
UNCOV
170
    rpcMsg.pCont = NULL;
×
171
  } else {
172
    if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
18✔
173
      vTrace("vgId:%d, failed to put vnode commit to queue since %s", pVnode->config.vgId, terrstr());
16!
174
    }
175
  }
176
}
177

178
#if BATCH_ENABLE
179

180
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
181
  if (*arrSize <= 0) return;
182
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
183

184
  (void)taosThreadMutexLock(&pVnode->lock);
185
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
186
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
187
  if (wait) {
188
    pVnode->blocked = true;
189
  }
190
  (void)taosThreadMutexUnlock(&pVnode->lock);
191

192
  if (code > 0) {
193
    for (int32_t i = 0; i < *arrSize; ++i) {
194
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
195
    }
196
  } else if (code < 0) {
197
    if (terrno != 0) code = terrno;
198
    for (int32_t i = 0; i < *arrSize; ++i) {
199
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
200
    }
201
  }
202

203
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
204
  pLastMsg = NULL;
205

206
  for (int32_t i = 0; i < *arrSize; ++i) {
207
    SRpcMsg        *pMsg = pMsgArr[i];
208
    const STraceId *trace = &pMsg->info.traceId;
209
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
210
    rpcFreeCont(pMsg->pCont);
211
    taosFreeQitem(pMsg);
212
  }
213

214
  *arrSize = 0;
215
}
216

217
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
218
  SVnode   *pVnode = pInfo->ahandle;
219
  int32_t   vgId = pVnode->config.vgId;
220
  int32_t   code = 0;
221
  SRpcMsg  *pMsg = NULL;
222
  int32_t   arrayPos = 0;
223
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
224
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
225
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
226

227
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
228
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
229
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
230
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
231

232
    const STraceId *trace = &pMsg->info.traceId;
233
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
234
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
235

236
    if (!pVnode->restored) {
237
      vGWarn("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
238
             TMSG_INFO(pMsg->msgType));
239
      terrno = TSDB_CODE_SYN_RESTORING;
240
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
241
      rpcFreeCont(pMsg->pCont);
242
      taosFreeQitem(pMsg);
243
      continue;
244
    }
245

246
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
247
      vGError("vgId:%d, msg:%p failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
248
      terrno = TSDB_CODE_OUT_OF_MEMORY;
249
      vnodeHandleProposeError(pVnode, pMsg, terrno);
250
      rpcFreeCont(pMsg->pCont);
251
      taosFreeQitem(pMsg);
252
      continue;
253
    }
254

255
    bool atExit = false;
256
    vnodeProposeCommitOnNeed(pVnode, atExit);
257

258
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
259
    if (code != 0) {
260
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
261
      rpcFreeCont(pMsg->pCont);
262
      taosFreeQitem(pMsg);
263
      continue;
264
    }
265

266
    if (isBlock) {
267
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
268
    }
269

270
    pMsgArr[arrayPos] = pMsg;
271
    pIsWeakArr[arrayPos] = isWeak;
272
    arrayPos++;
273

274
    if (isBlock || msg == numOfMsgs - 1) {
275
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
276
    }
277
  }
278

279
  taosMemoryFree(pMsgArr);
280
  taosMemoryFree(pIsWeakArr);
281
}
282

283
#else
284

285
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
40✔
286
  SVnode  *pVnode = pInfo->ahandle;
40✔
287
  int32_t  vgId = pVnode->config.vgId;
40✔
288
  int32_t  code = 0;
40✔
289
  SRpcMsg *pMsg = NULL;
40✔
290
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
40!
291

292
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
80✔
293
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
40!
294
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
40✔
295

296
    const STraceId *trace = &pMsg->info.traceId;
40✔
297
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
40!
298
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
299

300
    if (!pVnode->restored) {
40!
UNCOV
301
      vGWarn("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
×
302
             TMSG_INFO(pMsg->msgType));
UNCOV
303
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
×
UNCOV
304
      rpcFreeCont(pMsg->pCont);
×
UNCOV
305
      taosFreeQitem(pMsg);
×
UNCOV
306
      continue;
×
307
    }
308

309
    bool atExit = false;
40✔
310
    vnodeProposeCommitOnNeed(pVnode, atExit);
40✔
311

312
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
40✔
313
    if (code != 0) {
40!
UNCOV
314
      if (code != TSDB_CODE_MSG_PREPROCESSED) {
×
UNCOV
315
        vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code));
×
316
      }
UNCOV
317
      vnodeHandleProposeError(pVnode, pMsg, code);
×
UNCOV
318
      rpcFreeCont(pMsg->pCont);
×
UNCOV
319
      taosFreeQitem(pMsg);
×
UNCOV
320
      continue;
×
321
    }
322

323
    code = vnodeProposeMsg(pVnode, pMsg, isWeak);
40✔
324

325
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
40!
326
    rpcFreeCont(pMsg->pCont);
40✔
327
    taosFreeQitem(pMsg);
40✔
328
  }
329
}
40✔
330

331
#endif
332

333
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
18✔
334
  SVnode  *pVnode = pInfo->ahandle;
18✔
335
  int32_t  vgId = pVnode->config.vgId;
18✔
336
  int32_t  code = 0;
18✔
337
  SRpcMsg *pMsg = NULL;
18✔
338

339
  for (int32_t i = 0; i < numOfMsgs; ++i) {
36✔
340
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
18!
341
    const STraceId *trace = &pMsg->info.traceId;
18✔
342

343
    if (vnodeIsMsgBlock(pMsg->msgType)) {
18!
UNCOV
344
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
×
345
              ", blocking msg obtained sec:%d seq:%" PRId64,
346
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
347
              pVnode->blockSeq);
348
    } else {
349
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
18!
350
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
351
    }
352

353
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
18✔
354
    if (rsp.code == 0) {
18!
355
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
18!
UNCOV
356
        rsp.code = terrno;
×
357
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
×
358
                pMsg->info.conn.applyIndex);
359
      }
360
    }
361

362
    vnodePostBlockMsg(pVnode, pMsg);
18✔
363
    if (rsp.info.handle != NULL) {
18!
UNCOV
364
      tmsgSendRsp(&rsp);
×
365
    } else {
366
      if (rsp.pCont) {
18!
UNCOV
367
        rpcFreeCont(rsp.pCont);
×
368
      }
369
    }
370

371
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
18!
372
    rpcFreeCont(pMsg->pCont);
18✔
373
    taosFreeQitem(pMsg);
18✔
374
  }
375
}
18✔
376

UNCOV
377
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
×
UNCOV
378
  const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
379
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
×
380

UNCOV
381
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
×
UNCOV
382
  if (code != 0) {
×
UNCOV
383
    vGError("vgId:%d, failed to process sync msg:%p type:%s, reason: %s", pVnode->config.vgId, pMsg,
×
384
            TMSG_INFO(pMsg->msgType), tstrerror(code));
385
  }
386

UNCOV
387
  return code;
×
388
}
389

UNCOV
390
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
391
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
392
    return TSDB_CODE_INVALID_PARA;
×
393
  }
394

UNCOV
395
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
396
    rpcFreeCont(pMsg->pCont);
×
397
    pMsg->pCont = NULL;
×
398
    return TSDB_CODE_INVALID_PARA;
×
399
  }
400

UNCOV
401
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
402
  if (code != 0) {
×
403
    rpcFreeCont(pMsg->pCont);
×
404
    pMsg->pCont = NULL;
×
405
  }
UNCOV
406
  return code;
×
407
}
408

UNCOV
409
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
UNCOV
410
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
UNCOV
411
    return TSDB_CODE_INVALID_PARA;
×
412
  }
413

UNCOV
414
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
UNCOV
415
    rpcFreeCont(pMsg->pCont);
×
UNCOV
416
    pMsg->pCont = NULL;
×
417
    return TSDB_CODE_INVALID_PARA;
×
418
  }
419

UNCOV
420
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
×
UNCOV
421
  if (code != 0) {
×
UNCOV
422
    rpcFreeCont(pMsg->pCont);
×
UNCOV
423
    pMsg->pCont = NULL;
×
424
  }
UNCOV
425
  return code;
×
426
}
427

UNCOV
428
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
×
UNCOV
429
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
×
UNCOV
430
  if (code != 0) {
×
UNCOV
431
    rpcFreeCont(pMsg->pCont);
×
432
    pMsg->pCont = NULL;
×
433
  }
UNCOV
434
  return code;
×
435
}
436

437
static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
360✔
438
  return vnodeGetSnapshot(pFsm->data, pSnapshot);
360✔
439
}
440

441
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
18✔
442
  SVnode *pVnode = pFsm->data;
18✔
443
  pMsg->info.conn.applyIndex = pMeta->index;
18✔
444
  pMsg->info.conn.applyTerm = pMeta->term;
18✔
445

446
  const STraceId *trace = &pMsg->info.traceId;
18✔
447
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
18!
448
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
449
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
450
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
451

452
  return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
18✔
453
}
454

455
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
18✔
456
  if (pMsg->code == 0) {
18!
457
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
18✔
458
  }
459

UNCOV
460
  const STraceId *trace = &pMsg->info.traceId;
×
461
  SVnode         *pVnode = pFsm->data;
×
462
  vnodePostBlockMsg(pVnode, pMsg);
×
463

UNCOV
464
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
465
  if (rsp.info.handle != NULL) {
×
466
    tmsgSendRsp(&rsp);
×
467
  }
468

UNCOV
469
  vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
×
470
  rpcFreeCont(pMsg->pCont);
×
471
  pMsg->pCont = NULL;
×
472
  return 0;
×
473
}
474

UNCOV
475
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
476
  if (pMeta->isWeak == 1) {
×
477
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
×
478
  }
UNCOV
479
  return 0;
×
480
}
481

482
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
83✔
483
  SVnode *pVnode = pFSM->data;
83✔
484
  return atomic_load_64(&pVnode->state.applied);
83✔
485
}
486

UNCOV
487
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
488
  SVnode *pVnode = pFsm->data;
×
489
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
×
490
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
491
         TMSG_INFO(pMsg->msgType));
UNCOV
492
}
×
493

UNCOV
494
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
×
UNCOV
495
  SVnode *pVnode = pFsm->data;
×
UNCOV
496
  return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
×
497
}
498

UNCOV
499
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
×
UNCOV
500
  SVnode *pVnode = pFsm->data;
×
UNCOV
501
  vnodeSnapReaderClose(pReader);
×
UNCOV
502
}
×
503

UNCOV
504
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
×
UNCOV
505
  SVnode *pVnode = pFsm->data;
×
UNCOV
506
  return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
×
507
}
508

UNCOV
509
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
×
UNCOV
510
  SVnode *pVnode = pFsm->data;
×
511

UNCOV
512
  do {
×
513
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
UNCOV
514
    if (itemSize == 0) {
×
UNCOV
515
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
×
UNCOV
516
      break;
×
517
    } else {
UNCOV
518
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
×
519
      taosMsleep(10);
×
520
    }
521
  } while (true);
522

UNCOV
523
  return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
×
524
}
525

UNCOV
526
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
×
UNCOV
527
  SVnode *pVnode = pFsm->data;
×
UNCOV
528
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
×
529
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
530

UNCOV
531
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
×
UNCOV
532
  if (code != 0) {
×
UNCOV
533
    vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
×
534
           code);
535
  }
UNCOV
536
  return code;
×
537
}
538

UNCOV
539
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
×
UNCOV
540
  SVnode *pVnode = pFsm->data;
×
UNCOV
541
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
×
UNCOV
542
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
×
UNCOV
543
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
×
UNCOV
544
  return code;
×
545
}
546

547
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
18✔
548
  SVnode   *pVnode = pFsm->data;
18✔
549
  int32_t   vgId = pVnode->config.vgId;
18✔
550
  SyncIndex appliedIdx = -1;
18✔
551

552
  do {
553
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
25✔
554
    if (appliedIdx > commitIdx) {
25!
UNCOV
555
      vError("vgId:%d, restore failed since applied-index:%" PRId64 " is larger than commit-index:%" PRId64, vgId,
×
556
             appliedIdx, commitIdx);
UNCOV
557
      break;
×
558
    }
559
    if (appliedIdx == commitIdx) {
25✔
560
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
18!
561
      break;
18✔
562
    } else {
563
      vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
7!
564
            ", applied-index:%" PRId64,
565
            vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
566
      taosMsleep(10);
7✔
567
    }
568
  } while (true);
569

570
  walApplyVer(pVnode->pWal, commitIdx);
18✔
571
  pVnode->restored = true;
18✔
572

573
  SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
18✔
574
  streamMetaWLock(pMeta);
18✔
575

576
  if (pMeta->startInfo.tasksWillRestart) {
18!
UNCOV
577
    vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
×
578
    streamMetaWUnLock(pMeta);
×
579
    return;
×
580
  }
581

582
  if (vnodeIsRoleLeader(pVnode)) {
18!
583
    // start to restore all stream tasks
584
    if (tsDisableStream) {
18!
UNCOV
585
      vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
×
586
    } else {
587
      vInfo("vgId:%d sync restore finished, start to launch stream task(s)", vgId);
18!
588
      if (pMeta->startInfo.startAllTasks == 1) {
18!
UNCOV
589
        pMeta->startInfo.restartCount += 1;
×
590
        vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
×
591
               pMeta->startInfo.restartCount);
592
      } else {
593
        pMeta->startInfo.startAllTasks = 1;
18✔
594
        streamMetaWUnLock(pMeta);
18✔
595

596
        tqInfo("vgId:%d stream task already loaded, start them", vgId);
18!
597
        int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
18✔
598
        if (code != 0) {
18!
UNCOV
599
          tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code));
×
600
        }
601
        return;
18✔
602
      }
603
    }
604
  } else {
UNCOV
605
    vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
×
606
  }
607

UNCOV
608
  streamMetaWUnLock(pMeta);
×
609
}
610

UNCOV
611
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
×
UNCOV
612
  SVnode *pVnode = pFsm->data;
×
UNCOV
613
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
×
614

UNCOV
615
  (void)taosThreadMutexLock(&pVnode->lock);
×
UNCOV
616
  if (pVnode->blocked) {
×
UNCOV
617
    pVnode->blocked = false;
×
618
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
×
619
    if (tsem_post(&pVnode->syncSem) != 0) {
×
620
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
621
    }
622
  }
UNCOV
623
  (void)taosThreadMutexUnlock(&pVnode->lock);
×
624

UNCOV
625
  if (pVnode->pTq) {
×
UNCOV
626
    tqUpdateNodeStage(pVnode->pTq, false);
×
UNCOV
627
    if (tqStopStreamTasksAsync(pVnode->pTq) != 0) {
×
UNCOV
628
      vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId);
×
629
    }
630
  }
UNCOV
631
}
×
632

UNCOV
633
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
×
UNCOV
634
  SVnode *pVnode = pFsm->data;
×
UNCOV
635
  vInfo("vgId:%d, become learner", pVnode->config.vgId);
×
636

UNCOV
637
  (void)taosThreadMutexLock(&pVnode->lock);
×
UNCOV
638
  if (pVnode->blocked) {
×
UNCOV
639
    pVnode->blocked = false;
×
640
    vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
×
641
    if (tsem_post(&pVnode->syncSem) != 0) {
×
642
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
643
    }
644
  }
UNCOV
645
  (void)taosThreadMutexUnlock(&pVnode->lock);
×
UNCOV
646
}
×
647

648
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
18✔
649
  SVnode *pVnode = pFsm->data;
18✔
650
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
18!
651
  if (pVnode->pTq) {
18!
652
    tqUpdateNodeStage(pVnode->pTq, true);
18✔
653
  }
654
}
18✔
655

UNCOV
656
static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) {
×
657
  SVnode *pVnode = pFsm->data;
×
658
  vDebug("vgId:%d, become assigned leader", pVnode->config.vgId);
×
659
  if (pVnode->pTq) {
×
660
    tqUpdateNodeStage(pVnode->pTq, true);
×
661
  }
UNCOV
662
}
×
663

UNCOV
664
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
×
665
  SVnode *pVnode = pFsm->data;
×
666

UNCOV
667
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
668
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
669
    return (itemSize == 0);
×
670
  } else {
UNCOV
671
    return true;
×
672
  }
673
}
674

UNCOV
675
static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
×
UNCOV
676
  SVnode *pVnode = pFsm->data;
×
677

UNCOV
678
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
UNCOV
679
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
UNCOV
680
    return itemSize;
×
681
  } else {
UNCOV
682
    return TSDB_CODE_INVALID_PARA;
×
683
  }
684
}
685

686
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
18✔
687
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
18!
688
  if (pFsm == NULL) {
18!
UNCOV
689
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
690
    return NULL;
×
691
  }
692
  pFsm->data = pVnode;
18✔
693
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
18✔
694
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
18✔
695
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
18✔
696
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
18✔
697
  pFsm->FpGetSnapshot = NULL;
18✔
698
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
18✔
699
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
18✔
700
  pFsm->FpLeaderTransferCb = NULL;
18✔
701
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
18✔
702
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
18✔
703
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
18✔
704
  pFsm->FpBecomeAssignedLeaderCb = vnodeBecomeAssignedLeader;
18✔
705
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
18✔
706
  pFsm->FpBecomeLearnerCb = vnodeBecomeLearner;
18✔
707
  pFsm->FpReConfigCb = NULL;
18✔
708
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
18✔
709
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
18✔
710
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
18✔
711
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
18✔
712
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
18✔
713
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
18✔
714

715
  return pFsm;
18✔
716
}
717

718
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
18✔
719
  SSyncInfo syncInfo = {
18✔
720
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
721
      .batchSize = 1,
722
      .vgId = pVnode->config.vgId,
18✔
723
      .syncCfg = pVnode->config.syncCfg,
724
      .pWal = pVnode->pWal,
18✔
725
      .msgcb = &pVnode->msgCb,
18✔
726
      .syncSendMSg = vnodeSyncSendMsg,
727
      .syncEqMsg = vnodeSyncEqMsg,
728
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
729
      .pingMs = 5000,
730
      .electMs = 4000,
731
      .heartbeatMs = 700,
732
  };
733

734
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
18✔
735
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
18✔
736

737
  SSyncCfg *pCfg = &syncInfo.syncCfg;
18✔
738
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
18!
739
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
36✔
740
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
18✔
741
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn,
18!
742
          pNode->nodePort, pNode->nodeId, pNode->clusterId);
743
  }
744

745
  pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
18✔
746
  if (pVnode->sync <= 0) {
18!
UNCOV
747
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
×
UNCOV
748
    return terrno;
×
749
  }
750

751
  return 0;
18✔
752
}
753

754
int32_t vnodeSyncStart(SVnode *pVnode) {
18✔
755
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
18!
756
  int32_t code = syncStart(pVnode->sync);
18✔
757
  if (code) {
18!
UNCOV
758
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code));
×
759
    return code;
×
760
  }
761
  return 0;
18✔
762
}
763

764
void vnodeSyncPreClose(SVnode *pVnode) {
18✔
765
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
18!
766
  int32_t code = syncLeaderTransfer(pVnode->sync);
18✔
767
  if (code) {
18!
UNCOV
768
    vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
×
769
  }
770
  syncPreStop(pVnode->sync);
18✔
771

772
  (void)taosThreadMutexLock(&pVnode->lock);
18✔
773
  if (pVnode->blocked) {
18!
UNCOV
774
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
×
UNCOV
775
    pVnode->blocked = false;
×
UNCOV
776
    if (tsem_post(&pVnode->syncSem) != 0) {
×
UNCOV
777
      vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
778
    }
779
  }
780
  (void)taosThreadMutexUnlock(&pVnode->lock);
18✔
781
}
18✔
782

783
void vnodeSyncPostClose(SVnode *pVnode) {
18✔
784
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
18!
785
  syncPostStop(pVnode->sync);
18✔
786
}
18✔
787

788
void vnodeSyncClose(SVnode *pVnode) {
18✔
789
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
18!
790
  syncStop(pVnode->sync);
18✔
791
}
18✔
792

UNCOV
793
void vnodeSyncCheckTimeout(SVnode *pVnode) {
×
UNCOV
794
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
×
UNCOV
795
  (void)taosThreadMutexLock(&pVnode->lock);
×
UNCOV
796
  if (pVnode->blocked) {
×
UNCOV
797
    int32_t curSec = taosGetTimestampSec();
×
UNCOV
798
    int32_t delta = curSec - pVnode->blockSec;
×
UNCOV
799
    if (delta > VNODE_TIMEOUT_SEC) {
×
UNCOV
800
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
×
801
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
UNCOV
802
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
×
803
#if 0
804
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
805
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
806
              rpcMsg.info.handle, rpcMsg.info.ahandle);
807
        rpcSendResponse(&rpcMsg);
808
#endif
809
      }
UNCOV
810
      pVnode->blocked = false;
×
811
      pVnode->blockSec = 0;
×
812
      pVnode->blockSeq = 0;
×
813
      if (tsem_post(&pVnode->syncSem) != 0) {
×
814
        vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
815
      }
816
    }
817
  }
UNCOV
818
  (void)taosThreadMutexUnlock(&pVnode->lock);
×
UNCOV
819
}
×
820

821
bool vnodeIsRoleLeader(SVnode *pVnode) {
36✔
822
  SSyncState state = syncGetState(pVnode->sync);
36✔
823
  return state.state == TAOS_SYNC_STATE_LEADER;
36✔
824
}
825

826
bool vnodeIsLeader(SVnode *pVnode) {
18✔
827
  terrno = 0;
18✔
828
  SSyncState state = syncGetState(pVnode->sync);
18✔
829

830
  if (terrno != 0) {
18!
UNCOV
831
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
×
UNCOV
832
    return false;
×
833
  }
834

835
  if (state.state != TAOS_SYNC_STATE_LEADER) {
18!
UNCOV
836
    terrno = TSDB_CODE_SYN_NOT_LEADER;
×
837
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
×
838
    return false;
×
839
  }
840

841
  if (!state.restored || !pVnode->restored) {
18!
UNCOV
842
    terrno = TSDB_CODE_SYN_RESTORING;
×
UNCOV
843
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
×
844
    return false;
×
845
  }
846

847
  return true;
18✔
848
}
849

UNCOV
850
int64_t vnodeClusterId(SVnode *pVnode) {
×
851
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
×
852
  return syncCfg->nodeInfo[syncCfg->myIndex].clusterId;
×
853
}
854

855
int32_t vnodeNodeId(SVnode *pVnode) {
18✔
856
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
18✔
857
  return syncCfg->nodeInfo[syncCfg->myIndex].nodeId;
18✔
858
}
859

860
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
360✔
861
  int code = 0;
360✔
862
  pSnap->lastApplyIndex = pVnode->state.committed;
360✔
863
  pSnap->lastApplyTerm = pVnode->state.commitTerm;
360✔
864
  pSnap->lastConfigIndex = -1;
360✔
865
  pSnap->state = SYNC_FSM_STATE_COMPLETE;
360✔
866

867
  if (tsdbSnapGetFsState(pVnode) != TSDB_FS_STATE_NORMAL) {
360!
UNCOV
868
    pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
×
869
  }
870

871
  if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
360!
UNCOV
872
    code = tsdbSnapPrepDescription(pVnode, pSnap);
×
873
  }
874
  return code;
360✔
875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc