• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.78
/source/dnode/vnode/src/vnd/vnodeSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "tq.h"
19
#include "tqCommon.h"
20
#include "tsdb.h"
21
#include "vnd.h"
22

23
#define BATCH_ENABLE 0
24

25
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
10,374,236✔
26

27
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
2,426✔
28
  const STraceId *trace = &pMsg->info.traceId;
2,426✔
29
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
2,426!
30
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
31
  if (tsem_wait(&pVnode->syncSem) != 0) {
2,426!
32
    vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
×
33
  }
34
}
2,426✔
35

36
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
1,346,794✔
37
  if (vnodeIsMsgBlock(pMsg->msgType)) {
1,346,794✔
38
    const STraceId *trace = &pMsg->info.traceId;
18,731✔
39
    (void)taosThreadMutexLock(&pVnode->lock);
18,731✔
40
    if (pVnode->blocked) {
18,734✔
41
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
2,407!
42
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
43
      pVnode->blocked = false;
2,407✔
44
      pVnode->blockSec = 0;
2,407✔
45
      pVnode->blockSeq = 0;
2,407✔
46
      if (tsem_post(&pVnode->syncSem) != 0) {
2,407!
47
        vError("vgId:%d, failed to post sem", pVnode->config.vgId);
×
48
      }
49
    }
50
    (void)taosThreadMutexUnlock(&pVnode->lock);
18,734✔
51
  }
52
}
1,346,857✔
53

54
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
38,163✔
55
  SEpSet newEpSet = {0};
38,163✔
56
  syncGetRetryEpSet(pVnode->sync, &newEpSet);
38,163✔
57

58
  const STraceId *trace = &pMsg->info.traceId;
38,163✔
59
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
38,163!
60
          newEpSet.numOfEps, newEpSet.inUse);
61
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
149,444✔
62
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
111,281!
63
            newEpSet.eps[i].port);
64
  }
65
  pMsg->info.hasEpSet = 1;
38,163✔
66

67
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;
38,163!
68

69
  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
38,163✔
70
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);
38,163✔
71

72
  rsp.pCont = rpcMallocCont(contLen);
38,163✔
73
  if (rsp.pCont == NULL) {
38,163!
74
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
75
  } else {
76
    if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
38,163!
77
      vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
×
78
    }
79
    rsp.contLen = contLen;
38,163✔
80
  }
81

82
  tmsgSendRsp(&rsp);
38,163✔
83
}
38,163✔
84

85
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
10,088,907✔
86
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
10,088,907✔
87
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
10,088,907✔
88
    rsp.code = terrno;
7✔
89
    const STraceId *trace = &pMsg->info.traceId;
×
90
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
×
91
  }
92
  if (rsp.info.handle != NULL) {
10,089,060✔
93
    tmsgSendRsp(&rsp);
10,053,191✔
94
  } else {
95
    if (rsp.pCont) {
35,869✔
96
      rpcFreeCont(rsp.pCont);
25,852✔
97
    }
98
  }
99
}
10,089,223✔
100

101
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
142,341✔
102
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
142,341✔
103
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
29,051✔
104
  } else if (code == TSDB_CODE_MSG_PREPROCESSED) {
113,290✔
105
    SRpcMsg rsp = {.code = TSDB_CODE_SUCCESS, .info = pMsg->info};
112,372✔
106
    if (rsp.info.handle != NULL) {
112,372!
107
      tmsgSendRsp(&rsp);
112,455✔
108
    }
109
  } else {
110
    const STraceId *trace = &pMsg->info.traceId;
918✔
111
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
918!
112
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
918✔
113
    if (rsp.info.handle != NULL) {
918✔
114
      tmsgSendRsp(&rsp);
517✔
115
    }
116
  }
117
}
142,788✔
118

119
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
10,242,843✔
120
  int64_t seq = 0;
10,242,843✔
121

122
  (void)taosThreadMutexLock(&pVnode->lock);
10,242,843✔
123
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
10,243,140✔
124
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
10,242,811✔
125
  if (wait) {
10,242,810✔
126
    if (pVnode->blocked) {
2,426!
127
      return TSDB_CODE_INTERNAL_ERROR;
×
128
    }
129
    pVnode->blocked = true;
2,426✔
130
    pVnode->blockSec = taosGetTimestampSec();
2,426✔
131
    pVnode->blockSeq = seq;
2,426✔
132
  }
133
  (void)taosThreadMutexUnlock(&pVnode->lock);
10,242,810✔
134

135
  if (code > 0) {
10,243,163✔
136
    vnodeHandleWriteMsg(pVnode, pMsg);
10,089,286✔
137
  } else if (code < 0) {
153,877✔
138
    if (terrno != 0) code = terrno;
10,510!
139
    vnodeHandleProposeError(pVnode, pMsg, code);
10,510✔
140
  }
141

142
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
10,243,119✔
143
  return code;
10,243,118✔
144
}
145

146
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
10,364,825✔
147
  if (!vnodeShouldCommit(pVnode, atExit)) {
10,364,825✔
148
    return;
10,357,744✔
149
  }
150

151
  int32_t   contLen = sizeof(SMsgHead);
7,963✔
152
  SMsgHead *pHead = rpcMallocCont(contLen);
7,963✔
153
  pHead->contLen = contLen;
7,971✔
154
  pHead->vgId = pVnode->config.vgId;
7,971✔
155

156
  SRpcMsg rpcMsg = {0};
7,971✔
157
  rpcMsg.msgType = TDMT_VND_COMMIT;
7,971✔
158
  rpcMsg.contLen = contLen;
7,971✔
159
  rpcMsg.pCont = pHead;
7,971✔
160
  rpcMsg.info.noResp = 1;
7,971✔
161

162
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
7,971✔
163
  bool isWeak = false;
7,973✔
164

165
  if (!atExit) {
7,973✔
166
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
643!
167
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
×
168
    }
169
    rpcFreeCont(rpcMsg.pCont);
643✔
170
    rpcMsg.pCont = NULL;
643✔
171
  } else {
172
    int32_t code = 0;
7,330✔
173
    if ((code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg)) < 0) {
7,330✔
174
      vError("vgId:%d, failed to put vnode commit to write_queue since %s", pVnode->config.vgId, tstrerror(code));
3,467✔
175
    }
176
  }
177
}
178

179
#if BATCH_ENABLE
180

181
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
182
  if (*arrSize <= 0) return;
183
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
184

185
  (void)taosThreadMutexLock(&pVnode->lock);
186
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
187
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
188
  if (wait) {
189
    pVnode->blocked = true;
190
  }
191
  (void)taosThreadMutexUnlock(&pVnode->lock);
192

193
  if (code > 0) {
194
    for (int32_t i = 0; i < *arrSize; ++i) {
195
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
196
    }
197
  } else if (code < 0) {
198
    if (terrno != 0) code = terrno;
199
    for (int32_t i = 0; i < *arrSize; ++i) {
200
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
201
    }
202
  }
203

204
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
205
  pLastMsg = NULL;
206

207
  for (int32_t i = 0; i < *arrSize; ++i) {
208
    SRpcMsg        *pMsg = pMsgArr[i];
209
    const STraceId *trace = &pMsg->info.traceId;
210
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
211
    rpcFreeCont(pMsg->pCont);
212
    taosFreeQitem(pMsg);
213
  }
214

215
  *arrSize = 0;
216
}
217

218
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
219
  SVnode   *pVnode = pInfo->ahandle;
220
  int32_t   vgId = pVnode->config.vgId;
221
  int32_t   code = 0;
222
  SRpcMsg  *pMsg = NULL;
223
  int32_t   arrayPos = 0;
224
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
225
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
226
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
227

228
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
229
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
230
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
231
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
232

233
    const STraceId *trace = &pMsg->info.traceId;
234
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
235
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
236

237
    if (!pVnode->restored) {
238
      vGWarn("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
239
             TMSG_INFO(pMsg->msgType));
240
      terrno = TSDB_CODE_SYN_RESTORING;
241
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
242
      rpcFreeCont(pMsg->pCont);
243
      taosFreeQitem(pMsg);
244
      continue;
245
    }
246

247
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
248
      vGError("vgId:%d, msg:%p failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
249
      terrno = TSDB_CODE_OUT_OF_MEMORY;
250
      vnodeHandleProposeError(pVnode, pMsg, terrno);
251
      rpcFreeCont(pMsg->pCont);
252
      taosFreeQitem(pMsg);
253
      continue;
254
    }
255

256
    bool atExit = false;
257
    vnodeProposeCommitOnNeed(pVnode, atExit);
258

259
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
260
    if (code != 0) {
261
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
262
      rpcFreeCont(pMsg->pCont);
263
      taosFreeQitem(pMsg);
264
      continue;
265
    }
266

267
    if (isBlock) {
268
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
269
    }
270

271
    pMsgArr[arrayPos] = pMsg;
272
    pIsWeakArr[arrayPos] = isWeak;
273
    arrayPos++;
274

275
    if (isBlock || msg == numOfMsgs - 1) {
276
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
277
    }
278
  }
279

280
  taosMemoryFree(pMsgArr);
281
  taosMemoryFree(pIsWeakArr);
282
}
283

284
#else
285

286
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
7,954,232✔
287
  SVnode  *pVnode = pInfo->ahandle;
7,954,232✔
288
  int32_t  vgId = pVnode->config.vgId;
7,954,232✔
289
  int32_t  code = 0;
7,954,232✔
290
  SRpcMsg *pMsg = NULL;
7,954,232✔
291
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
7,954,232✔
292

293
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
18,329,409✔
294
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
10,374,532!
295
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
10,374,418✔
296

297
    const STraceId *trace = &pMsg->info.traceId;
10,374,189✔
298
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
10,374,189!
299
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
300

301
    if (!pVnode->restored) {
10,374,189✔
302
      vGWarn("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
19,145!
303
             TMSG_INFO(pMsg->msgType));
304
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
19,145✔
305
      rpcFreeCont(pMsg->pCont);
19,145✔
306
      taosFreeQitem(pMsg);
19,145✔
307
      continue;
19,145✔
308
    }
309

310
    bool atExit = false;
10,355,044✔
311
    vnodeProposeCommitOnNeed(pVnode, atExit);
10,355,044✔
312

313
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
10,355,653✔
314
    if (code != 0) {
10,354,985✔
315
      if (code != TSDB_CODE_MSG_PREPROCESSED) {
112,752✔
316
        vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code));
518!
317
      }
318
      vnodeHandleProposeError(pVnode, pMsg, code);
112,752✔
319
      rpcFreeCont(pMsg->pCont);
113,065✔
320
      taosFreeQitem(pMsg);
113,131✔
321
      continue;
113,209✔
322
    }
323

324
    code = vnodeProposeMsg(pVnode, pMsg, isWeak);
10,242,233✔
325

326
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
10,242,446!
327
    rpcFreeCont(pMsg->pCont);
10,242,446✔
328
    taosFreeQitem(pMsg);
10,242,533✔
329
  }
330
}
7,954,877✔
331

332
#endif
333

334
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
752,852✔
335
  SVnode  *pVnode = pInfo->ahandle;
752,852✔
336
  int32_t  vgId = pVnode->config.vgId;
752,852✔
337
  int32_t  code = 0;
752,852✔
338
  SRpcMsg *pMsg = NULL;
752,852✔
339

340
  for (int32_t i = 0; i < numOfMsgs; ++i) {
2,100,581✔
341
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
1,347,709!
342
    const STraceId *trace = &pMsg->info.traceId;
1,347,633✔
343

344
    if (vnodeIsMsgBlock(pMsg->msgType)) {
1,347,633✔
345
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
18,734!
346
              ", blocking msg obtained sec:%d seq:%" PRId64,
347
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
348
              pVnode->blockSeq);
349
    } else {
350
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
1,328,864!
351
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
352
    }
353

354
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
1,347,598✔
355
    if (rsp.code == 0) {
1,347,598!
356
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
1,347,915!
357
        rsp.code = terrno;
×
358
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
×
359
                pMsg->info.conn.applyIndex);
360
      }
361
    }
362

363
    vnodePostBlockMsg(pVnode, pMsg);
1,346,543✔
364
    if (rsp.info.handle != NULL) {
1,346,862✔
365
      tmsgSendRsp(&rsp);
143,293✔
366
    } else {
367
      if (rsp.pCont) {
1,203,569✔
368
        rpcFreeCont(rsp.pCont);
1,155,268✔
369
      }
370
    }
371

372
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
1,347,860!
373
    rpcFreeCont(pMsg->pCont);
1,347,860✔
374
    taosFreeQitem(pMsg);
1,347,858✔
375
  }
376
}
752,872✔
377

378
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
2,241,724✔
379
  const STraceId *trace = &pMsg->info.traceId;
2,241,724✔
380
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
2,241,724!
381

382
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
2,241,724✔
383
  if (code != 0) {
2,241,879✔
384
    vGError("vgId:%d, failed to process sync msg:%p type:%s, reason: %s", pVnode->config.vgId, pMsg,
12!
385
            TMSG_INFO(pMsg->msgType), tstrerror(code));
386
  }
387

388
  return code;
2,241,879✔
389
}
390

391
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
392
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
393
    return TSDB_CODE_INVALID_PARA;
×
394
  }
395

396
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
397
    rpcFreeCont(pMsg->pCont);
×
398
    pMsg->pCont = NULL;
×
399
    return TSDB_CODE_INVALID_PARA;
×
400
  }
401

402
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
403
  if (code != 0) {
×
404
    rpcFreeCont(pMsg->pCont);
×
405
    pMsg->pCont = NULL;
×
406
  }
407
  return code;
×
408
}
409

410
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
411,488✔
411
  if (pMsg == NULL || pMsg->pCont == NULL) {
411,488!
412
    return TSDB_CODE_INVALID_PARA;
×
413
  }
414

415
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
411,489!
416
    rpcFreeCont(pMsg->pCont);
2✔
417
    pMsg->pCont = NULL;
×
418
    return TSDB_CODE_INVALID_PARA;
×
419
  }
420

421
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
411,487✔
422
  if (code != 0) {
411,487✔
423
    rpcFreeCont(pMsg->pCont);
1,296✔
424
    pMsg->pCont = NULL;
1,294✔
425
  }
426
  return code;
411,485✔
427
}
428

429
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
1,857,073✔
430
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
1,857,073✔
431
  if (code != 0) {
1,857,125!
432
    rpcFreeCont(pMsg->pCont);
×
433
    pMsg->pCont = NULL;
×
434
  }
435
  return code;
1,857,125✔
436
}
437

438
static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
497,856✔
439
  return vnodeGetSnapshot(pFsm->data, pSnapshot);
497,856✔
440
}
441

442
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
1,347,825✔
443
  SVnode *pVnode = pFsm->data;
1,347,825✔
444
  pMsg->info.conn.applyIndex = pMeta->index;
1,347,825✔
445
  pMsg->info.conn.applyTerm = pMeta->term;
1,347,825✔
446

447
  const STraceId *trace = &pMsg->info.traceId;
1,347,825✔
448
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
1,347,825!
449
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
450
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
451
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
452

453
  int32_t code = tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
1,347,825✔
454
  if (code < 0) vError("vgId:%d, failed to put into apply_queue since %s", pVnode->config.vgId, tstrerror(code));
1,347,868!
455
  return code;
1,347,866✔
456
}
457

458
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
1,347,830✔
459
  if (pMsg->code == 0) {
1,347,830!
460
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
1,347,840✔
461
  }
462

463
  const STraceId *trace = &pMsg->info.traceId;
×
464
  SVnode         *pVnode = pFsm->data;
×
465
  vnodePostBlockMsg(pVnode, pMsg);
×
466

467
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
468
  if (rsp.info.handle != NULL) {
×
469
    tmsgSendRsp(&rsp);
×
470
  }
471

472
  vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
×
473
  rpcFreeCont(pMsg->pCont);
×
474
  pMsg->pCont = NULL;
×
475
  return 0;
×
476
}
477

478
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
479
  if (pMeta->isWeak == 1) {
×
480
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
×
481
  }
482
  return 0;
×
483
}
484

485
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
10,492,555✔
486
  SVnode *pVnode = pFSM->data;
10,492,555✔
487
  return atomic_load_64(&pVnode->state.applied);
10,492,555✔
488
}
489

490
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
491
  SVnode *pVnode = pFsm->data;
×
492
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
×
493
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
494
         TMSG_INFO(pMsg->msgType));
495
}
×
496

497
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
51✔
498
  SVnode *pVnode = pFsm->data;
51✔
499
  return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
51✔
500
}
501

502
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
51✔
503
  SVnode *pVnode = pFsm->data;
51✔
504
  vnodeSnapReaderClose(pReader);
51✔
505
}
51✔
506

507
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
6,755✔
508
  SVnode *pVnode = pFsm->data;
6,755✔
509
  return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
6,755✔
510
}
511

512
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
49✔
513
  SVnode *pVnode = pFsm->data;
49✔
514

515
  do {
×
516
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
49✔
517
    if (itemSize == 0) {
49!
518
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
49!
519
      break;
49✔
520
    } else {
521
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
×
522
      taosMsleep(10);
×
523
    }
524
  } while (true);
525

526
  return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
49✔
527
}
528

529
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
49✔
530
  SVnode *pVnode = pFsm->data;
49✔
531
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
49!
532
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
533

534
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
49✔
535
  if (code != 0) {
49!
536
    vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
×
537
           code);
538
  }
539
  return code;
49✔
540
}
541

542
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
6,602✔
543
  SVnode *pVnode = pFsm->data;
6,602✔
544
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
6,602!
545
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
6,602✔
546
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
6,602!
547
  return code;
6,602✔
548
}
549

550
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
11,837✔
551
  SVnode   *pVnode = pFsm->data;
11,837✔
552
  int32_t   vgId = pVnode->config.vgId;
11,837✔
553
  SyncIndex appliedIdx = -1;
11,837✔
554

555
  do {
556
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
30,224✔
557
    if (appliedIdx > commitIdx) {
30,236!
558
      vError("vgId:%d, restore failed since applied-index:%" PRId64 " is larger than commit-index:%" PRId64, vgId,
×
559
             appliedIdx, commitIdx);
560
      break;
×
561
    }
562
    if (appliedIdx == commitIdx) {
30,236✔
563
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
11,837✔
564
      break;
11,837✔
565
    } else {
566
      vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
18,399✔
567
            ", applied-index:%" PRId64,
568
            vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
569
      taosMsleep(10);
18,399✔
570
    }
571
  } while (true);
572

573
  walApplyVer(pVnode->pWal, commitIdx);
11,837✔
574
  pVnode->restored = true;
11,837✔
575

576
#ifdef USE_STREAM
577
  SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
11,837✔
578
  streamMetaWLock(pMeta);
11,837✔
579

580
  if (pMeta->startInfo.tasksWillRestart) {
11,837!
581
    vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
×
582
    streamMetaWUnLock(pMeta);
×
583
    return;
×
584
  }
585

586
  if (vnodeIsRoleLeader(pVnode)) {
11,837✔
587
    // start to restore all stream tasks
588
    if (tsDisableStream) {
9,990!
589
      vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
×
590
    } else {
591
      vInfo("vgId:%d sync restore finished, start to launch stream task(s)", vgId);
9,990✔
592
      if (pMeta->startInfo.startAllTasks == 1) {
9,990!
593
        pMeta->startInfo.restartCount += 1;
×
594
        vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
×
595
               pMeta->startInfo.restartCount);
596
      } else {
597
        pMeta->startInfo.startAllTasks = 1;
9,990✔
598
        streamMetaWUnLock(pMeta);
9,990✔
599

600
        tqInfo("vgId:%d stream task already loaded, start them", vgId);
9,990!
601
        int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false);
9,990✔
602
        if (code != 0) {
9,987!
603
          tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code));
×
604
        }
605
        return;
9,987✔
606
      }
607
    }
608
  } else {
609
    vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
1,847!
610
  }
611

612
  streamMetaWUnLock(pMeta);
1,847✔
613
#endif
614
}
615

616
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
4,599✔
617
  SVnode *pVnode = pFsm->data;
4,599✔
618
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
4,599!
619

620
  (void)taosThreadMutexLock(&pVnode->lock);
4,599✔
621
  if (pVnode->blocked) {
4,599!
622
    pVnode->blocked = false;
×
623
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
×
624
    if (tsem_post(&pVnode->syncSem) != 0) {
×
625
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
626
    }
627
  }
628
  (void)taosThreadMutexUnlock(&pVnode->lock);
4,599✔
629

630
#ifdef USE_TQ
631
  if (pVnode->pTq) {
4,599!
632
    tqUpdateNodeStage(pVnode->pTq, false);
4,599✔
633
    if (tqStopStreamAllTasksAsync(pVnode->pTq->pStreamMeta, &pVnode->msgCb) != 0) {
4,599!
634
      vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId);
×
635
    }
636
  }
637
#endif
638
}
4,599✔
639

640
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
160✔
641
  SVnode *pVnode = pFsm->data;
160✔
642
  vInfo("vgId:%d, become learner", pVnode->config.vgId);
160!
643

644
  (void)taosThreadMutexLock(&pVnode->lock);
160✔
645
  if (pVnode->blocked) {
160!
646
    pVnode->blocked = false;
×
647
    vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
×
648
    if (tsem_post(&pVnode->syncSem) != 0) {
×
649
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
650
    }
651
  }
652
  (void)taosThreadMutexUnlock(&pVnode->lock);
160✔
653
}
160✔
654

655
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
9,988✔
656
  SVnode *pVnode = pFsm->data;
9,988✔
657
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
9,988✔
658
#ifdef USE_TQ
659
  if (pVnode->pTq) {
9,988!
660
    tqUpdateNodeStage(pVnode->pTq, true);
9,990✔
661
  }
662
#endif
663
}
9,986✔
664

665
static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) {
×
666
  SVnode *pVnode = pFsm->data;
×
667
  vDebug("vgId:%d, become assigned leader", pVnode->config.vgId);
×
668
#ifdef USE_TQ
669
  if (pVnode->pTq) {
×
670
    tqUpdateNodeStage(pVnode->pTq, true);
×
671
  }
672
#endif
673
}
×
674

675
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
×
676
  SVnode *pVnode = pFsm->data;
×
677

678
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
679
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
680
    return (itemSize == 0);
×
681
  } else {
682
    return true;
×
683
  }
684
}
685

686
static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
219,871✔
687
  SVnode *pVnode = pFsm->data;
219,871✔
688

689
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
219,871!
690
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
219,889✔
691
    return itemSize;
219,880✔
692
  } else {
693
    return TSDB_CODE_INVALID_PARA;
×
694
  }
695
}
696

697
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
11,907✔
698
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
11,907!
699
  if (pFsm == NULL) {
11,906!
700
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
701
    return NULL;
×
702
  }
703
  pFsm->data = pVnode;
11,906✔
704
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
11,906✔
705
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
11,906✔
706
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
11,906✔
707
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
11,906✔
708
  pFsm->FpGetSnapshot = NULL;
11,906✔
709
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
11,906✔
710
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
11,906✔
711
  pFsm->FpAfterRestoredCb = NULL;
11,906✔
712
  pFsm->FpLeaderTransferCb = NULL;
11,906✔
713
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
11,906✔
714
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
11,906✔
715
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
11,906✔
716
  pFsm->FpBecomeAssignedLeaderCb = vnodeBecomeAssignedLeader;
11,906✔
717
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
11,906✔
718
  pFsm->FpBecomeLearnerCb = vnodeBecomeLearner;
11,906✔
719
  pFsm->FpReConfigCb = NULL;
11,906✔
720
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
11,906✔
721
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
11,906✔
722
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
11,906✔
723
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
11,906✔
724
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
11,906✔
725
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
11,906✔
726

727
  return pFsm;
11,906✔
728
}
729

730
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
11,902✔
731
  SSyncInfo syncInfo = {
11,902✔
732
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
733
      .batchSize = 1,
734
      .vgId = pVnode->config.vgId,
11,902✔
735
      .syncCfg = pVnode->config.syncCfg,
736
      .pWal = pVnode->pWal,
11,902✔
737
      .msgcb = &pVnode->msgCb,
11,902✔
738
      .syncSendMSg = vnodeSyncSendMsg,
739
      .syncEqMsg = vnodeSyncEqMsg,
740
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
741
      .pingMs = 5000,
742
      .electMs = 4000,
743
      .heartbeatMs = 700,
744
  };
745

746
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
11,902✔
747
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
11,902✔
748

749
  SSyncCfg *pCfg = &syncInfo.syncCfg;
11,906✔
750
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
11,906✔
751
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
29,873✔
752
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
17,966✔
753
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn,
17,966✔
754
          pNode->nodePort, pNode->nodeId, pNode->clusterId);
755
  }
756

757
  pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
11,907✔
758
  if (pVnode->sync <= 0) {
11,907!
759
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
×
760
    return terrno;
×
761
  }
762

763
  return 0;
11,907✔
764
}
765

766
int32_t vnodeSyncStart(SVnode *pVnode) {
11,907✔
767
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
11,907✔
768
  int32_t code = syncStart(pVnode->sync);
11,907✔
769
  if (code) {
11,907!
770
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code));
×
771
    return code;
×
772
  }
773
  return 0;
11,907✔
774
}
775

776
void vnodeSyncPreClose(SVnode *pVnode) {
11,906✔
777
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
11,906✔
778
  int32_t code = syncLeaderTransfer(pVnode->sync);
11,907✔
779
  if (code) {
11,907✔
780
    vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
884!
781
  }
782
  syncPreStop(pVnode->sync);
11,907✔
783

784
  (void)taosThreadMutexLock(&pVnode->lock);
11,907✔
785
  if (pVnode->blocked) {
11,907✔
786
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
19!
787
    pVnode->blocked = false;
19✔
788
    if (tsem_post(&pVnode->syncSem) != 0) {
19!
789
      vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
790
    }
791
  }
792
  (void)taosThreadMutexUnlock(&pVnode->lock);
11,907✔
793
}
11,904✔
794

795
void vnodeSyncPostClose(SVnode *pVnode) {
11,907✔
796
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
11,907✔
797
  syncPostStop(pVnode->sync);
11,907✔
798
}
11,901✔
799

800
void vnodeSyncClose(SVnode *pVnode) {
11,906✔
801
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
11,906✔
802
  syncStop(pVnode->sync);
11,907✔
803
}
11,903✔
804

805
void vnodeSyncCheckTimeout(SVnode *pVnode) {
37,312✔
806
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
37,312✔
807
  (void)taosThreadMutexLock(&pVnode->lock);
37,312✔
808
  if (pVnode->blocked) {
37,312!
UNCOV
809
    int32_t curSec = taosGetTimestampSec();
×
UNCOV
810
    int32_t delta = curSec - pVnode->blockSec;
×
UNCOV
811
    if (delta > VNODE_TIMEOUT_SEC) {
×
812
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
×
813
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
814
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
×
815
#if 0
816
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
817
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
818
              rpcMsg.info.handle, rpcMsg.info.ahandle);
819
        rpcSendResponse(&rpcMsg);
820
#endif
821
      }
822
      pVnode->blocked = false;
×
823
      pVnode->blockSec = 0;
×
824
      pVnode->blockSeq = 0;
×
825
      if (tsem_post(&pVnode->syncSem) != 0) {
×
826
        vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
827
      }
828
    }
829
  }
830
  (void)taosThreadMutexUnlock(&pVnode->lock);
37,312✔
831
}
37,312✔
832

833
bool vnodeIsRoleLeader(SVnode *pVnode) {
156,795✔
834
  SSyncState state = syncGetState(pVnode->sync);
156,795✔
835
  return state.state == TAOS_SYNC_STATE_LEADER;
156,819✔
836
}
837

838
bool vnodeIsLeader(SVnode *pVnode) {
20,136✔
839
  terrno = 0;
20,136✔
840
  SSyncState state = syncGetState(pVnode->sync);
20,152✔
841

842
  if (terrno != 0) {
20,164✔
843
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
2,070!
844
    return false;
2,070✔
845
  }
846

847
  if (state.state != TAOS_SYNC_STATE_LEADER) {
18,075!
848
    terrno = TSDB_CODE_SYN_NOT_LEADER;
×
849
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
×
850
    return false;
×
851
  }
852

853
  if (!state.restored || !pVnode->restored) {
18,075✔
854
    terrno = TSDB_CODE_SYN_RESTORING;
5✔
855
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
×
856
    return false;
×
857
  }
858

859
  return true;
18,070✔
860
}
861

862
int64_t vnodeClusterId(SVnode *pVnode) {
×
863
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
×
864
  return syncCfg->nodeInfo[syncCfg->myIndex].clusterId;
×
865
}
866

867
int32_t vnodeNodeId(SVnode *pVnode) {
821,700✔
868
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
821,700✔
869
  return syncCfg->nodeInfo[syncCfg->myIndex].nodeId;
821,700✔
870
}
871

872
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
497,902✔
873
  int code = 0;
497,902✔
874
  pSnap->lastApplyIndex = pVnode->state.committed;
497,902✔
875
  pSnap->lastApplyTerm = pVnode->state.commitTerm;
497,902✔
876
  pSnap->lastConfigIndex = -1;
497,902✔
877
  pSnap->state = SYNC_FSM_STATE_COMPLETE;
497,902✔
878

879
  if (tsdbSnapGetFsState(pVnode) != TSDB_FS_STATE_NORMAL) {
497,902!
880
    pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
×
881
  }
882

883
  if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
497,919✔
884
    code = tsdbSnapPrepDescription(pVnode, pSnap);
97✔
885
  }
886
  return code;
497,923✔
887
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc