• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5071

17 May 2026 01:15AM UTC coverage: 63.054% (-10.3%) from 73.326%
#5071

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

238317 of 377957 relevant lines covered (63.05%)

130539817.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.35
/source/dnode/vnode/src/vnd/vnodeSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "tq.h"
19
#include "tsdb.h"
20
#include "vnd.h"
21
#include "stream.h"
22

23
#define BATCH_ENABLE 0
24

25
// static int32_t inline vnodeShouldRewriteSubmitMsg(SVnode *pVnode, SRpcMsg **pMsg);
26
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
525,497,154✔
27

28
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
925,298✔
29
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
925,298✔
30
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
31
  if (tsem_wait(&pVnode->syncSem) != 0) {
925,298✔
32
    vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
×
33
  }
34
}
925,672✔
35

36
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
116,817,581✔
37
  if (vnodeIsMsgBlock(pMsg->msgType)) {
116,817,581✔
38
    (void)taosThreadMutexLock(&pVnode->lock);
9,051,039✔
39
    if (pVnode->blocked) {
9,051,039✔
40
      vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
921,767✔
41
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
42
      pVnode->blocked = false;
921,767✔
43
      pVnode->blockSec = 0;
921,767✔
44
      pVnode->blockSeq = 0;
921,767✔
45
      if (tsem_post(&pVnode->syncSem) != 0) {
921,767✔
46
        vError("vgId:%d, failed to post sem", pVnode->config.vgId);
×
47
      }
48
    }
49
    (void)taosThreadMutexUnlock(&pVnode->lock);
9,051,039✔
50
  }
51
}
116,858,878✔
52

53
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
13,035,558✔
54
  SEpSet newEpSet = {0};
13,035,558✔
55
  syncGetRetryEpSet(pVnode->sync, &newEpSet);
13,035,558✔
56

57
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is redirect since not leader, numOfEps:%d inUse:%d",
13,035,558✔
58
          pVnode->config.vgId, pMsg, newEpSet.numOfEps, newEpSet.inUse);
59
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
51,138,171✔
60
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i,
38,101,816✔
61
            newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
62
  }
63
  pMsg->info.hasEpSet = 1;
13,036,355✔
64

65
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;
13,035,558✔
66

67
  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
13,035,558✔
68
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);
13,035,558✔
69

70
  rsp.pCont = rpcMallocCont(contLen);
13,034,732✔
71
  if (rsp.pCont == NULL) {
13,035,558✔
72
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
73
  } else {
74
    if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
13,035,558✔
75
      vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
×
76
    }
77
    rsp.contLen = contLen;
13,034,761✔
78
  }
79

80
  tmsgSendRsp(&rsp);
13,034,761✔
81
}
13,035,558✔
82

83
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
474,957,972✔
84
  int32_t code = 0;
474,957,972✔
85
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
474,957,972✔
86

87
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
475,017,231✔
88
    rsp.code = terrno;
3,308✔
89
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to apply right now since %s", pVnode->config.vgId, pMsg,
3,308✔
90
            terrstr());
91
  }
92
  if (rsp.info.handle != NULL) {
475,062,344✔
93
    tmsgSendRsp(&rsp);
474,061,941✔
94
  } else {
95
    if (rsp.pCont) {
1,000,403✔
96
      rpcFreeCont(rsp.pCont);
×
97
    }
98
  }
99
}
475,044,790✔
100

101
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
19,266,738✔
102
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
19,266,738✔
103
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
10,063,079✔
104
  } else if (code == TSDB_CODE_MSG_PREPROCESSED) {
9,203,659✔
105
    SRpcMsg rsp = {.code = TSDB_CODE_SUCCESS, .info = pMsg->info};
8,891,473✔
106
    if (rsp.info.handle != NULL) {
8,876,759✔
107
      tmsgSendRsp(&rsp);
8,887,890✔
108
    }
109
  } else {
110
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg,
312,186✔
111
            tstrerror(code), code);
112
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
312,186✔
113
    if (rsp.info.handle != NULL) {
309,326✔
114
      tmsgSendRsp(&rsp);
107,687✔
115
    }
116
  }
117
}
19,288,635✔
118

119
static int32_t tEncodeSubSubmitReq2(SEncoder *pEncoder, SSubmitTbData *pSubmitTbData) {
×
120
  int32_t code = 0;
×
121
  int32_t lino = 0;
×
122
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
123

124
  TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pSubmitTbData->flags));
×
125
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pSubmitTbData->suid));
×
126
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pSubmitTbData->uid));
×
127
  TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pSubmitTbData->sver));
×
128
  TAOS_CHECK_EXIT(tEncodeU64v(pEncoder, taosArrayGetSize(pSubmitTbData->aRowP)));
×
129

130
  int32_t nRow = taosArrayGetSize(pSubmitTbData->aRowP);
×
131
  SRow  **rows = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
×
132
  for (int32_t iRow = 0; iRow < nRow; iRow++) {
×
133
    TAOS_CHECK_EXIT(tEncodeRow(pEncoder, rows[iRow]));
×
134
  }
135

136
  if (pSubmitTbData->ctimeMs > 0) {
×
137
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pSubmitTbData->ctimeMs));
×
138
  }
139
  tEndEncode(pEncoder);
×
140
_exit:
×
141
  return code;
×
142
}
143
static int32_t tEncodeSubmitReq2(SEncoder *pEncoder, SSubmitReq2 *pReq) {
×
144
  int32_t code = 0;
×
145
  int32_t lino = 0;
×
146

147
  if (tStartEncode(pEncoder) < 0) {
×
148
    code = TSDB_CODE_INVALID_MSG;
×
149
    TSDB_CHECK_CODE(code, lino, _exit);
×
150
  }
151

152
  TAOS_CHECK_EXIT(tEncodeU64v(pEncoder, taosArrayGetSize(pReq->aSubmitTbData)));
×
153

154
  for (int32_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
×
155
    if (tEncodeSubSubmitReq2(pEncoder, taosArrayGet(pReq->aSubmitTbData, i)) < 0) {
×
156
      code = TSDB_CODE_INVALID_MSG;
×
157
      goto _exit;
×
158
    }
159
  }
160
  tEndEncode(pEncoder);
×
161
_exit:
×
162
  if (code != 0) {
×
163
    vDebug("failed to encode submit req since %s", tstrerror(code));
×
164
  }
165
  return code;
×
166
}
167
static int32_t tEncodeSubSubmitAndUpdate(SVnode *pVnode, SEncoder *pEncoder, SSubmitTbData *pSubmitTbData) {
×
168
  uint8_t hasBlob = 0;
×
169
  int32_t code = 0;
×
170
  int32_t lino = 0;
×
171

172
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
173

174
  TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pSubmitTbData->flags));
×
175
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pSubmitTbData->suid));
×
176
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pSubmitTbData->uid));
×
177
  TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pSubmitTbData->sver));
×
178
  TAOS_CHECK_EXIT(tEncodeU64v(pEncoder, taosArrayGetSize(pSubmitTbData->aRowP)));
×
179
  if (pSubmitTbData->flags & SUBMIT_REQ_WITH_BLOB) {
×
180
    hasBlob = 1;
×
181
  }
182

183
  if (hasBlob) {
×
184
    int32_t    nr = 0;
×
185
    uint64_t   seq = 0;
×
186
    SBlobSet  *pBlobSet = pSubmitTbData->pBlobSet;
×
187

188
    SRow  **pRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
×
189
    int32_t sz = taosArrayGetSize(pBlobSet->pSeqTable);
×
190
    for (int32_t i = 0; i < sz; i++) {
×
191
      SBlobValue *p = taosArrayGet(pBlobSet->pSeqTable, i);
×
192

193
      // code = bseAppend(pVnode->pBse, &seq, pBlobSet->data + p->offset, p->len);
194
      memcpy(pRow[i]->data + p->dataOffset, (void *)&seq, sizeof(uint64_t));
×
195
    }
196
  }
197
  int32_t nRow = taosArrayGetSize(pSubmitTbData->aRowP);
×
198
  SRow  **rows = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
×
199
  for (int32_t iRow = 0; iRow < nRow; iRow++) {
×
200
    TAOS_CHECK_EXIT(tEncodeRow(pEncoder, rows[iRow]));
×
201
  }
202

203
  if (pSubmitTbData->ctimeMs > 0) {
×
204
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pSubmitTbData->ctimeMs));
×
205
  }
206
  tEndEncode(pEncoder);
×
207
_exit:
×
208
  return code;
×
209
}
210
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
507,066,760✔
211
  int32_t code = 0;
507,066,760✔
212
  int64_t seq = 0;
507,066,760✔
213

214
  (void)taosThreadMutexLock(&pVnode->lock);
507,069,891✔
215
  code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
507,075,692✔
216
  bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
506,971,388✔
217
  if (wait) {
506,955,379✔
218
    if (pVnode->blocked) {
925,672✔
219
      (void)taosThreadMutexUnlock(&pVnode->lock);
×
220
      return TSDB_CODE_INTERNAL_ERROR;
×
221
    }
222
    pVnode->blocked = true;
925,672✔
223
    pVnode->blockSec = taosGetTimestampSec();
925,298✔
224
    pVnode->blockSeq = seq;
925,672✔
225
  }
226
  (void)taosThreadMutexUnlock(&pVnode->lock);
506,955,379✔
227

228
  if (code > 0) {
507,066,599✔
229
    vnodeHandleWriteMsg(pVnode, pMsg);
475,071,962✔
230
  } else if (code < 0) {
31,994,637✔
231
    if (terrno != 0) code = terrno;
756,315✔
232
    vnodeHandleProposeError(pVnode, pMsg, code);
756,315✔
233
  }
234

235
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
507,039,617✔
236

237
  // if (rewrite) {
238
  //   rpcFreeCont(newMsg.pCont);
239
  //   newMsg.pCont = NULL;
240
  // }
241
  return code;
507,044,338✔
242
}
243

244
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
519,260,188✔
245
  if (!vnodeShouldCommit(pVnode, atExit)) {
519,260,188✔
246
    return;
517,196,139✔
247
  }
248

249
  int32_t   contLen = sizeof(SMsgHead);
2,090,310✔
250
  SMsgHead *pHead = rpcMallocCont(contLen);
2,090,310✔
251
  pHead->contLen = contLen;
2,089,302✔
252
  pHead->vgId = pVnode->config.vgId;
2,089,302✔
253

254
  SRpcMsg rpcMsg = {0};
2,089,593✔
255
  rpcMsg.msgType = TDMT_VND_COMMIT;
2,090,310✔
256
  rpcMsg.contLen = contLen;
2,090,310✔
257
  rpcMsg.pCont = pHead;
2,090,310✔
258
  rpcMsg.info.noResp = 1;
2,090,310✔
259

260
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
2,090,310✔
261
  bool isWeak = false;
2,090,310✔
262

263
  if (!atExit) {
2,090,310✔
264
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
75,532✔
265
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
×
266
    }
267
    rpcFreeCont(rpcMsg.pCont);
75,532✔
268
    rpcMsg.pCont = NULL;
75,532✔
269
  } else {
270
    int32_t code = 0;
2,014,778✔
271
    if ((code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg)) < 0) {
2,014,778✔
272
      vError("vgId:%d, failed to put vnode commit to write_queue since %s", pVnode->config.vgId, tstrerror(code));
871,210✔
273
    }
274
  }
275
}
276

277
#if BATCH_ENABLE
278

279
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
280
  if (*arrSize <= 0) return;
281
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
282

283
  (void)taosThreadMutexLock(&pVnode->lock);
284
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
285
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
286
  if (wait) {
287
    pVnode->blocked = true;
288
  }
289
  (void)taosThreadMutexUnlock(&pVnode->lock);
290

291
  if (code > 0) {
292
    for (int32_t i = 0; i < *arrSize; ++i) {
293
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
294
    }
295
  } else if (code < 0) {
296
    if (terrno != 0) code = terrno;
297
    for (int32_t i = 0; i < *arrSize; ++i) {
298
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
299
    }
300
  }
301

302
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
303
  pLastMsg = NULL;
304

305
  for (int32_t i = 0; i < *arrSize; ++i) {
306
    SRpcMsg        *pMsg = pMsgArr[i];
307
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
308
    rpcFreeCont(pMsg->pCont);
309
    taosFreeQitem(pMsg);
310
  }
311

312
  *arrSize = 0;
313
}
314

315
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
316
  SVnode   *pVnode = pInfo->ahandle;
317
  int32_t   vgId = pVnode->config.vgId;
318
  int32_t   code = 0;
319
  SRpcMsg  *pMsg = NULL;
320
  int32_t   arrayPos = 0;
321
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
322
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
323
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
324

325
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
326
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
327
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
328
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
329

330
    vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
331
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
332

333
    if (!pVnode->restored) {
334
      vGWarn(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since restore not finished, type:%s", vgId, pMsg,
335
             TMSG_INFO(pMsg->msgType));
336
      terrno = TSDB_CODE_SYN_RESTORING;
337
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
338
      rpcFreeCont(pMsg->pCont);
339
      taosFreeQitem(pMsg);
340
      continue;
341
    }
342

343
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
344
      vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
345
      terrno = TSDB_CODE_OUT_OF_MEMORY;
346
      vnodeHandleProposeError(pVnode, pMsg, terrno);
347
      rpcFreeCont(pMsg->pCont);
348
      taosFreeQitem(pMsg);
349
      continue;
350
    }
351

352
    bool atExit = false;
353
    vnodeProposeCommitOnNeed(pVnode, atExit);
354

355
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
356
    if (code != 0) {
357
      vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to pre-process since %s", vgId, pMsg, terrstr());
358
      rpcFreeCont(pMsg->pCont);
359
      taosFreeQitem(pMsg);
360
      continue;
361
    }
362

363
    if (isBlock) {
364
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
365
    }
366

367
    pMsgArr[arrayPos] = pMsg;
368
    pIsWeakArr[arrayPos] = isWeak;
369
    arrayPos++;
370

371
    if (isBlock || msg == numOfMsgs - 1) {
372
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
373
    }
374
  }
375

376
  taosMemoryFree(pMsgArr);
377
  taosMemoryFree(pIsWeakArr);
378
}
379

380
#else
381

382
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
517,368,262✔
383
  SVnode  *pVnode = pInfo->ahandle;
517,368,262✔
384
  int32_t  vgId = pVnode->config.vgId;
517,376,388✔
385
  int32_t  code = 0;
517,387,828✔
386
  SRpcMsg *pMsg = NULL;
517,387,828✔
387
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
517,390,859✔
388

389
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
1,042,874,047✔
390
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
525,503,899✔
391
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
525,477,997✔
392

393
    vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p",
525,486,494✔
394
            vgId, pMsg, isWeak, vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
395

396
    if (!pVnode->restored) {
525,517,349✔
397
      vGWarn(&pMsg->info.traceId,
9,515,110✔
398
             "vgId:%d, msg:%p, failed to process since restore not finished, type:%s, seqNum:%" PRId64, vgId, pMsg,
399
             TMSG_INFO(pMsg->msgType), pMsg->info.seqNum);
400
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
9,515,110✔
401
      rpcFreeCont(pMsg->pCont);
9,515,110✔
402
      taosFreeQitem(pMsg);
9,514,313✔
403
      continue;
9,514,284✔
404
    }
405

406
    bool atExit = false;
516,017,737✔
407
    vnodeProposeCommitOnNeed(pVnode, atExit);
516,017,737✔
408

409
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
516,019,537✔
410
    if (code != 0) {
515,959,442✔
411
      if (code != TSDB_CODE_MSG_PREPROCESSED) {
8,973,409✔
412
        vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to pre-process since %s", vgId, pMsg, tstrerror(code));
107,687✔
413
      }
414
      vnodeHandleProposeError(pVnode, pMsg, code);
8,973,409✔
415
      rpcFreeCont(pMsg->pCont);
9,009,946✔
416
      taosFreeQitem(pMsg);
9,012,628✔
417
      continue;
9,009,568✔
418
    }
419

420
    code = vnodeProposeMsg(pVnode, pMsg, isWeak);
506,986,033✔
421

422
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x", vgId, pMsg, code);
506,959,762✔
423
    rpcFreeCont(pMsg->pCont);
506,959,762✔
424
    taosFreeQitem(pMsg);
506,976,242✔
425
  }
426
}
517,343,557✔
427

428
#endif
429

430
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
101,226,464✔
431
  SVnode  *pVnode = pInfo->ahandle;
101,226,464✔
432
  int32_t  vgId = pVnode->config.vgId;
101,226,909✔
433
  int32_t  code = 0;
101,227,577✔
434
  SRpcMsg *pMsg = NULL;
101,227,577✔
435

436
  for (int32_t i = 0; i < numOfMsgs; ++i) {
218,017,859✔
437
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
116,773,982✔
438

439
    if (vnodeIsMsgBlock(pMsg->msgType)) {
116,854,564✔
440
      vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64
9,051,039✔
441
              ", blocking msg obtained sec:%d seq:%" PRId64,
442
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
443
              pVnode->blockSeq);
444
    } else {
445
      vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
107,800,983✔
446
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
447
    }
448

449
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
116,855,432✔
450
    if (rsp.code == 0) {
116,858,779✔
451
      int32_t ret = 0;
116,859,418✔
452
      int32_t count = 0;
116,859,418✔
453
      while (1) {
454
        ret = vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp);
116,859,418✔
455
        if (ret < 0) {
116,859,712✔
456
          rsp.code = ret;
×
457
          vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to apply since %s, index:%" PRId64, vgId, pMsg,
×
458
                  tstrerror(ret), pMsg->info.conn.applyIndex);
459
        }
460
        if (ret == TSDB_CODE_VND_WRITE_DISABLED) {
116,858,128✔
461
          if (count % 100 == 0)
×
462
            vGError(&pMsg->info.traceId,
×
463
                    "vgId:%d, msg:%p, failed to apply since %s, retry after 200ms, retry count:%d index:%" PRId64, vgId,
464
                    pMsg, tstrerror(ret), count, pMsg->info.conn.applyIndex);
465
          count++;
×
466
          taosMsleep(200);  // wait for a while before retrying
×
467
        } else{
468
          break;
116,858,128✔
469
        } 
470
      }
471
    }
472

473
    vnodePostBlockMsg(pVnode, pMsg);
116,857,489✔
474
    if (rsp.info.handle != NULL) {
116,856,401✔
475
      tmsgSendRsp(&rsp);
31,221,721✔
476
    } else {
477
      if (rsp.pCont) {
85,634,680✔
478
        rpcFreeCont(rsp.pCont);
63,198,519✔
479
      }
480
    }
481

482
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code,
116,856,401✔
483
            pMsg->info.conn.applyIndex);
484
    rpcFreeCont(pMsg->pCont);
116,856,401✔
485
    taosFreeQitem(pMsg);
116,858,925✔
486
  }
487
}
101,244,361✔
488

489
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
233,216,906✔
490
  vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-sync queue, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
233,216,906✔
491

492
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
233,225,767✔
493
  if (code != 0) {
233,218,625✔
494
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since %s, type:%s", pVnode->config.vgId, pMsg, tstrerror(code),
770✔
495
            TMSG_INFO(pMsg->msgType));
496
  }
497

498
  return code;
233,197,119✔
499
}
500

501
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
502
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
503
    return TSDB_CODE_INVALID_PARA;
×
504
  }
505

506
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
507
    rpcFreeCont(pMsg->pCont);
×
508
    pMsg->pCont = NULL;
×
509
    return TSDB_CODE_INVALID_PARA;
×
510
  }
511

512
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
513
  if (code != 0) {
×
514
    rpcFreeCont(pMsg->pCont);
×
515
    pMsg->pCont = NULL;
×
516
  }
517
  return code;
×
518
}
519

520
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
67,776,414✔
521
  if (pMsg == NULL || pMsg->pCont == NULL) {
67,776,414✔
522
    return TSDB_CODE_INVALID_PARA;
×
523
  }
524

525
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
67,777,451✔
526
    rpcFreeCont(pMsg->pCont);
402✔
527
    pMsg->pCont = NULL;
×
528
    return TSDB_CODE_INVALID_PARA;
×
529
  }
530

531
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
67,776,414✔
532
  if (code != 0) {
67,776,816✔
533
    rpcFreeCont(pMsg->pCont);
550,610✔
534
    pMsg->pCont = NULL;
550,610✔
535
  }
536
  return code;
67,776,816✔
537
}
538

539
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
166,926,377✔
540
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
166,926,377✔
541
  // if (code != 0) {
542
  //   rpcFreeCont(pMsg->pCont);
543
  //   pMsg->pCont = NULL;
544
  // }
545
  return code;
166,937,722✔
546
}
547

548
static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
100,745,908✔
549
  return vnodeGetSnapshot(pFsm->data, pSnapshot);
100,745,908✔
550
}
551

552
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
116,858,164✔
553
  SVnode *pVnode = pFsm->data;
116,858,164✔
554
  pMsg->info.conn.applyIndex = pMeta->index;
116,858,538✔
555
  pMsg->info.conn.applyTerm = pMeta->term;
116,856,866✔
556

557
  vGDebug(&pMsg->info.traceId,
116,857,414✔
558
          "vgId:%d, index:%" PRId64 ", execute commit cb, fsm:%p, term:%" PRIu64 ", msg-index:%" PRId64
559
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
560
          pVnode->config.vgId, pMeta->index, pFsm, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
561
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
562

563
  int32_t code = tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
116,861,115✔
564
  if (code < 0) {
116,855,398✔
565
    if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
×
566
      pVnode->applyQueueErrorCount++;
×
567
      if (pVnode->applyQueueErrorCount == APPLY_QUEUE_ERROR_THRESHOLD) {
×
568
        pVnode->applyQueueErrorCount = 0;
×
569
        vWarn("vgId:%d, failed to put into apply_queue since %s", pVnode->config.vgId, tstrerror(code));
×
570
      }
571
    } else {
572
      vError("vgId:%d, failed to put into apply_queue since %s", pVnode->config.vgId, tstrerror(code));
×
573
    }
574
  }
575
  return code;
116,855,457✔
576
}
577

578
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
116,859,762✔
579
  if (pMsg->code == 0) {
116,859,762✔
580
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
116,857,830✔
581
  }
582

583
  SVnode *pVnode = pFsm->data;
×
584
  vnodePostBlockMsg(pVnode, pMsg);
×
585

586
  SRpcMsg rsp = {
×
587
      .code = pMsg->code,
×
588
      .info = pMsg->info,
589
  };
590

591
  if (rsp.info.handle != NULL) {
×
592
    tmsgSendRsp(&rsp);
×
593
  }
594

595
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code,
×
596
          pMeta->index);
597
  rpcFreeCont(pMsg->pCont);
×
598
  pMsg->pCont = NULL;
×
599
  return 0;
×
600
}
601

602
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
603
  if (pMeta->isWeak == 1) {
×
604
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
×
605
  }
606
  return 0;
×
607
}
608

609
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
600,583,086✔
610
  SVnode *pVnode = pFSM->data;
600,583,086✔
611
  return atomic_load_64(&pVnode->state.applied);
600,605,578✔
612
}
613

614
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
615
  SVnode *pVnode = pFsm->data;
×
616
  vGDebug(&pMsg->info.traceId,
×
617
          "vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
618
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
619
          TMSG_INFO(pMsg->msgType));
620
}
×
621

622
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
17,767✔
623
  SVnode *pVnode = pFsm->data;
17,767✔
624
  return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
17,767✔
625
}
626

627
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
17,767✔
628
  SVnode *pVnode = pFsm->data;
17,767✔
629
  vnodeSnapReaderClose(pReader);
17,767✔
630
}
17,767✔
631

632
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
147,697✔
633
  SVnode *pVnode = pFsm->data;
147,697✔
634
  return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
147,697✔
635
}
636

637
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
17,767✔
638
  SVnode *pVnode = pFsm->data;
17,767✔
639

640
  do {
×
641
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
17,767✔
642
    if (itemSize == 0) {
17,767✔
643
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
17,767✔
644
      break;
17,767✔
645
    } else {
646
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
×
647
      taosMsleep(10);
×
648
    }
649
  } while (true);
650

651
  return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
17,767✔
652
}
653

654
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
17,767✔
655
  SVnode *pVnode = pFsm->data;
17,767✔
656
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
17,767✔
657
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
658

659
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
17,767✔
660
  if (code != 0) {
17,767✔
661
    vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
×
662
           code);
663
  }
664
  return code;
17,767✔
665
}
666

667
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
129,930✔
668
  SVnode *pVnode = pFsm->data;
129,930✔
669
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
129,930✔
670
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
129,930✔
671
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
129,930✔
672
  return code;
129,930✔
673
}
674

675
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
4,226,201✔
676
  SVnode   *pVnode = pFsm->data;
4,226,201✔
677
  int32_t   vgId = pVnode->config.vgId;
4,227,290✔
678
  SyncIndex appliedIdx = -1;
4,221,471✔
679

680
  do {
681
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
8,234,194✔
682
    if (appliedIdx > commitIdx) {
8,237,077✔
683
      vError("vgId:%d, restore failed since applied-index:%" PRId64 " is larger than commit-index:%" PRId64, vgId,
1,782✔
684
             appliedIdx, commitIdx);
685
      break;
×
686
    }
687
    if (appliedIdx == commitIdx) {
8,235,402✔
688
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
4,229,730✔
689
      break;
4,230,296✔
690
    } else {
691
      if (appliedIdx % 10 == 0) {
4,005,672✔
692
        vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
76,336✔
693
              ", applied-index:%" PRId64,
694
              vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
695
      } else {
696
        vDebug("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
3,929,336✔
697
               ", applied-index:%" PRId64,
698
               vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
699
      }
700
      taosMsleep(10);
4,021,083✔
701
    }
702
  } while (true);
703

704
  walApplyVer(pVnode->pWal, commitIdx);
4,230,296✔
705
  pVnode->restored = true;
4,230,296✔
706
}
4,230,296✔
707

708
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
1,761,419✔
709
  SVnode *pVnode = pFsm->data;
1,761,419✔
710
  vInfo("vgId:%d, becomefollower callback", pVnode->config.vgId);
1,761,419✔
711

712
  (void)taosThreadMutexLock(&pVnode->lock);
1,761,419✔
713
  if (pVnode->blocked) {
1,760,622✔
714
    pVnode->blocked = false;
×
715
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
×
716
    if (tsem_post(&pVnode->syncSem) != 0) {
×
717
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
718
    }
719
  }
720
  (void)taosThreadMutexUnlock(&pVnode->lock);
1,760,622✔
721

722
  streamRemoveVnodeLeader(pVnode->config.vgId);
1,761,419✔
723
}
1,761,419✔
724

725
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
89,649✔
726
  SVnode *pVnode = pFsm->data;
89,649✔
727
  vInfo("vgId:%d, become learner", pVnode->config.vgId);
89,649✔
728

729
  (void)taosThreadMutexLock(&pVnode->lock);
89,649✔
730
  if (pVnode->blocked) {
89,649✔
731
    pVnode->blocked = false;
×
732
    vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
×
733
    if (tsem_post(&pVnode->syncSem) != 0) {
×
734
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
735
    }
736
  }
737
  (void)taosThreadMutexUnlock(&pVnode->lock);
89,649✔
738

739
  streamRemoveVnodeLeader(pVnode->config.vgId);  
89,649✔
740
}
89,649✔
741

742
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
3,259,456✔
743
  SVnode *pVnode = pFsm->data;
3,259,456✔
744
  vInfo("vgId:%d, become leader callback", pVnode->config.vgId);
3,260,827✔
745

746
  streamAddVnodeLeader(pVnode->config.vgId);
3,268,930✔
747
}
3,265,384✔
748

749
static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) {
×
750
  SVnode *pVnode = pFsm->data;
×
751
  vDebug("vgId:%d, become assigned leader", pVnode->config.vgId);
×
752

753
  streamAddVnodeLeader(pVnode->config.vgId);
×
754
}
×
755

756
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
×
757
  SVnode *pVnode = pFsm->data;
×
758

759
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
760
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
761
    return (itemSize == 0);
×
762
  } else {
763
    return true;
×
764
  }
765
}
766

767
static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
16,002,197✔
768
  SVnode *pVnode = pFsm->data;
16,002,197✔
769

770
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
16,009,146✔
771
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
16,018,038✔
772
    return itemSize;
16,004,887✔
773
  } else {
774
    return TSDB_CODE_INVALID_PARA;
×
775
  }
776
}
777

778
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
4,038,031✔
779
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
4,038,031✔
780
  if (pFsm == NULL) {
4,034,492✔
781
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
782
    return NULL;
×
783
  }
784
  pFsm->data = pVnode;
4,034,492✔
785
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
4,033,984✔
786
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
4,034,748✔
787
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
4,035,292✔
788
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
4,037,503✔
789
  pFsm->FpGetSnapshot = NULL;
4,036,437✔
790
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
4,033,756✔
791
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
4,033,752✔
792
  pFsm->FpAfterRestoredCb = NULL;
4,032,647✔
793
  pFsm->FpLeaderTransferCb = NULL;
4,033,000✔
794
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
4,032,888✔
795
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
4,034,921✔
796
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
4,032,969✔
797
  pFsm->FpBecomeAssignedLeaderCb = vnodeBecomeAssignedLeader;
4,032,883✔
798
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
4,034,297✔
799
  pFsm->FpBecomeLearnerCb = vnodeBecomeLearner;
4,034,448✔
800
  pFsm->FpReConfigCb = NULL;
4,028,997✔
801
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
4,031,991✔
802
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
4,030,224✔
803
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
4,028,997✔
804
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
4,030,710✔
805
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
4,033,788✔
806
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
4,031,110✔
807

808
  return pFsm;
4,028,584✔
809
}
810

811
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
4,034,945✔
812
  SSyncInfo syncInfo = {
8,072,908✔
813
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
814
      .batchSize = 1,
815
      .vgId = pVnode->config.vgId,
4,039,983✔
816
      .mountVgId = pVnode->config.mountVgId,
4,039,983✔
817
      .syncCfg = pVnode->config.syncCfg,
818
      .pWal = pVnode->pWal,
4,037,622✔
819
      .msgcb = &pVnode->msgCb,
4,039,459✔
820
      .syncSendMSg = vnodeSyncSendMsg,
821
      .syncEqMsg = vnodeSyncEqMsg,
822
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
823
      .pingMs = 5000,
824
      .electMs = tsVnodeElectIntervalMs,
825
      .heartbeatMs = tsVnodeHeartbeatIntervalMs,
826
  };
827

828
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
4,039,459✔
829
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
4,039,459✔
830

831
  SSyncCfg *pCfg = &syncInfo.syncCfg;
4,026,111✔
832
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d, electMs:%d, heartbeatMs:%d", pVnode->config.vgId,
4,026,111✔
833
        pCfg->replicaNum, pCfg->myIndex, syncInfo.electMs, syncInfo.heartbeatMs);
834
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
10,610,925✔
835
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
6,570,942✔
836
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn,
6,570,942✔
837
          pNode->nodePort, pNode->nodeId, pNode->clusterId);
838
  }
839

840
  pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
4,039,983✔
841
  if (pVnode->sync <= 0) {
4,039,983✔
842
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
×
843
    return terrno;
×
844
  }
845

846
  return 0;
4,039,983✔
847
}
848

849
int32_t vnodeSyncStart(SVnode *pVnode) {
4,038,577✔
850
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
4,038,577✔
851
  int32_t code = syncStart(pVnode->sync);
4,038,577✔
852
  if (code) {
4,039,983✔
853
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code));
×
854
    return code;
×
855
  }
856
  return 0;
4,039,983✔
857
}
858

859
int32_t vnodeSetSyncTimeout(SVnode *pVnode, int32_t ms) {
×
860
  int32_t code = syncResetTimer(pVnode->sync, tsVnodeElectIntervalMs, tsVnodeHeartbeatIntervalMs);
×
861
  if (code) {
×
862
    vError("vgId:%d, failed to vnode Set SyncTimeout since %s", pVnode->config.vgId, tstrerror(code));
×
863
    return code;
×
864
  }
865
  return 0;
×
866
}
867

868
int32_t vnodeSetElectBaseline(SVnode* pVnode, int32_t ms){
16,868✔
869
  int32_t code = syncSetElectBaseline(pVnode->sync, ms);
16,868✔
870
  if (code) {
16,868✔
871
    vError("vgId:%d, failed to set electBaseline since %s", pVnode->config.vgId, tstrerror(code));
×
872
    return code;
×
873
  }
874
  return 0;
16,868✔
875
}
876

877
void vnodeSyncPreClose(SVnode *pVnode) {
4,039,983✔
878
  vInfo("vgId:%d, vnode sync pre close", pVnode->config.vgId);
4,039,983✔
879
  int32_t code = syncLeaderTransfer(pVnode->sync);
4,039,983✔
880
  if (code) {
4,039,983✔
881
    vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
348,175✔
882
  }
883
  syncPreStop(pVnode->sync);
4,039,983✔
884

885
  (void)taosThreadMutexLock(&pVnode->lock);
4,036,416✔
886
  if (pVnode->blocked) {
4,039,983✔
887
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
3,905✔
888
    pVnode->blocked = false;
3,905✔
889
    if (tsem_post(&pVnode->syncSem) != 0) {
3,905✔
890
      vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
891
    }
892
  }
893
  (void)taosThreadMutexUnlock(&pVnode->lock);
4,039,983✔
894
}
4,038,251✔
895

896
void vnodeSyncPostClose(SVnode *pVnode) {
4,039,983✔
897
  vInfo("vgId:%d, vnode sync post close", pVnode->config.vgId);
4,039,983✔
898
  syncPostStop(pVnode->sync);
4,039,983✔
899
}
4,039,370✔
900

901
void vnodeSyncClose(SVnode *pVnode) {
4,039,983✔
902
  vInfo("vgId:%d, vnode close sync", pVnode->config.vgId);
4,039,983✔
903
  syncStop(pVnode->sync);
4,039,983✔
904
}
4,039,494✔
905

906
void vnodeSyncCheckTimeout(SVnode *pVnode) {
3,204,236✔
907
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
3,204,236✔
908
  (void)taosThreadMutexLock(&pVnode->lock);
3,204,236✔
909
  if (pVnode->blocked) {
3,204,236✔
910
    int32_t curSec = taosGetTimestampSec();
635✔
911
    int32_t delta = curSec - pVnode->blockSec;
635✔
912
    if (delta > VNODE_TIMEOUT_SEC) {
635✔
913
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
×
914
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
915
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
×
916
#if 0
917
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
918
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
919
              rpcMsg.info.handle, rpcMsg.info.ahandle);
920
        rpcSendResponse(&rpcMsg);
921
#endif
922
      }
923
      pVnode->blocked = false;
×
924
      pVnode->blockSec = 0;
×
925
      pVnode->blockSeq = 0;
×
926
      if (tsem_post(&pVnode->syncSem) != 0) {
×
927
        vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
928
      }
929
    }
930
  }
931
  (void)taosThreadMutexUnlock(&pVnode->lock);
3,204,236✔
932
}
3,204,236✔
933

934
bool vnodeIsRoleLeader(SVnode *pVnode) {
×
935
  SSyncState state = syncGetState(pVnode->sync);
×
936
  return state.state == TAOS_SYNC_STATE_LEADER;
×
937
}
938

939
bool vnodeIsLeader(SVnode *pVnode) {
4,039,983✔
940
  terrno = 0;
4,039,983✔
941
  SSyncState state = syncGetState(pVnode->sync);
4,039,983✔
942

943
  if (terrno != 0) {
4,039,260✔
944
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
778,189✔
945
    return false;
778,189✔
946
  }
947

948
  if (state.state != TAOS_SYNC_STATE_LEADER) {
3,259,542✔
949
    terrno = TSDB_CODE_SYN_NOT_LEADER;
×
950
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
×
951
    return false;
×
952
  }
953

954
  if (!state.restored || !pVnode->restored) {
3,259,542✔
955
    terrno = TSDB_CODE_SYN_RESTORING;
764✔
956
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
×
957
    return false;
×
958
  }
959

960
  return true;
3,258,778✔
961
}
962

963
int64_t vnodeClusterId(SVnode *pVnode) {
×
964
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
×
965
  return syncCfg->nodeInfo[syncCfg->myIndex].clusterId;
×
966
}
967

968
int32_t vnodeNodeId(SVnode *pVnode) {
74,270,890✔
969
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
74,270,890✔
970
  return syncCfg->nodeInfo[syncCfg->myIndex].nodeId;
74,282,273✔
971
}
972

973
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
100,759,436✔
974
  int code = 0;
100,759,436✔
975
  pSnap->lastApplyIndex = pVnode->state.committed;
100,759,436✔
976
  pSnap->lastApplyTerm = pVnode->state.commitTerm;
100,786,480✔
977
  pSnap->lastConfigIndex = -1;
100,790,359✔
978
  pSnap->state = SYNC_FSM_STATE_COMPLETE;
100,790,359✔
979

980
  if (tsdbSnapGetFsState(pVnode) != TSDB_FS_STATE_NORMAL) {
100,786,189✔
981
    pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
×
982
  }
983

984
  if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
100,794,284✔
985
    code = tsdbSnapPrepDescription(pVnode, pSnap);
33,113✔
986
  }
987
  return code;
100,798,266✔
988
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc