• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.18
/source/dnode/mnode/impl/src/mndMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndAcct.h"
18
#include "mndAnode.h"
19
#include "mndArbGroup.h"
20
#include "mndCluster.h"
21
#include "mndCompact.h"
22
#include "mndCompactDetail.h"
23
#include "mndConfig.h"
24
#include "mndConsumer.h"
25
#include "mndDb.h"
26
#include "mndDnode.h"
27
#include "mndFunc.h"
28
#include "mndGrant.h"
29
#include "mndIndex.h"
30
#include "mndInfoSchema.h"
31
#include "mndMnode.h"
32
#include "mndPerfSchema.h"
33
#include "mndPrivilege.h"
34
#include "mndProfile.h"
35
#include "mndQnode.h"
36
#include "mndQuery.h"
37
#include "mndShow.h"
38
#include "mndSma.h"
39
#include "mndSnode.h"
40
#include "mndStb.h"
41
#include "mndStream.h"
42
#include "mndSubscribe.h"
43
#include "mndSync.h"
44
#include "mndTelem.h"
45
#include "mndTopic.h"
46
#include "mndTrans.h"
47
#include "mndUser.h"
48
#include "mndVgroup.h"
49
#include "mndView.h"
50

51
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
13,911✔
52
  int32_t code = 0;
13,911✔
53
  (void)taosThreadRwlockRdlock(&pMnode->lock);
13,911✔
54
  if (pMnode->stopped) {
13,911!
55
    code = TSDB_CODE_APP_IS_STOPPING;
×
56
  } else if (!mndIsLeader(pMnode)) {
13,911✔
57
    code = 1;
705✔
58
  } else {
59
#if 1
60
    (void)atomic_add_fetch_32(&pMnode->rpcRef, 1);
13,206✔
61
#else
62
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
63
    mTrace("mnode rpc is acquired, ref:%d", ref);
64
#endif
65
  }
66
  (void)taosThreadRwlockUnlock(&pMnode->lock);
13,911✔
67
  TAOS_RETURN(code);
13,911✔
68
}
69

70
static inline void mndReleaseRpc(SMnode *pMnode) {
4,797,018✔
71
  (void)taosThreadRwlockRdlock(&pMnode->lock);
4,797,018✔
72
#if 1
73
  (void)atomic_sub_fetch_32(&pMnode->rpcRef, 1);
4,797,632✔
74
#else
75
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
76
  mTrace("mnode rpc is released, ref:%d", ref);
77
#endif
78
  (void)taosThreadRwlockUnlock(&pMnode->lock);
4,797,613✔
79
}
4,797,645✔
80

81
static void *mndBuildTimerMsg(int32_t *pContLen) {
162,355✔
82
  terrno = 0;
162,355✔
83
  SMTimerReq timerReq = {0};
162,355✔
84

85
  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
162,355✔
86
  if (contLen <= 0) return NULL;
162,355!
87
  void *pReq = rpcMallocCont(contLen);
162,355✔
88
  if (pReq == NULL) return NULL;
162,355!
89

90
  if (tSerializeSMTimerMsg(pReq, contLen, &timerReq) < 0) {
162,355!
91
    mError("failed to serialize timer msg since %s", terrstr());
×
92
  }
93
  *pContLen = contLen;
162,355✔
94
  return pReq;
162,355✔
95
}
96

97
static void mndPullupTrans(SMnode *pMnode) {
34,939✔
98
  mTrace("pullup trans msg");
34,939✔
99
  int32_t contLen = 0;
34,939✔
100
  void   *pReq = mndBuildTimerMsg(&contLen);
34,939✔
101
  if (pReq != NULL) {
34,939!
102
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
34,939✔
103
    // TODO check return value
104
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
34,939!
105
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
106
    }
107
  }
108
}
34,939✔
109

110
static void mndPullupCompacts(SMnode *pMnode) {
6,346✔
111
  mTrace("pullup compact timer msg");
6,346✔
112
  int32_t contLen = 0;
6,346✔
113
  void   *pReq = mndBuildTimerMsg(&contLen);
6,346✔
114
  if (pReq != NULL) {
6,346!
115
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen};
6,346✔
116
    // TODO check return value
117
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
6,346!
118
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
119
    }
120
  }
121
}
6,346✔
122

123
static void mndPullupTtl(SMnode *pMnode) {
6,875✔
124
  mTrace("pullup ttl");
6,875✔
125
  int32_t contLen = 0;
6,875✔
126
  void   *pReq = mndBuildTimerMsg(&contLen);
6,875✔
127
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
6,875✔
128
  // TODO check return value
129
  if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
6,875!
130
    mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
131
  }
132
}
6,875✔
133

134
static void mndPullupTrimDb(SMnode *pMnode) {
5✔
135
  mTrace("pullup s3migrate");
5!
136
  int32_t contLen = 0;
5✔
137
  void   *pReq = mndBuildTimerMsg(&contLen);
5✔
138
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen};
5✔
139
  // TODO check return value
140
  if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
5!
141
    mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
142
  }
143
}
5✔
144

145
static void mndPullupS3MigrateDb(SMnode *pMnode) {
×
146
  mTrace("pullup trim");
×
147
  int32_t contLen = 0;
×
148
  void   *pReq = mndBuildTimerMsg(&contLen);
×
149
  // TODO check return value
150
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_S3MIGRATE_DB_TIMER, .pCont = pReq, .contLen = contLen};
×
151
  if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
×
152
    mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
153
  }
154
}
×
155

156
static int32_t mndPullupArbHeartbeat(SMnode *pMnode) {
34,134✔
157
  mTrace("pullup arb hb");
34,134✔
158
  int32_t contLen = 0;
34,134✔
159
  void   *pReq = mndBuildTimerMsg(&contLen);
34,134✔
160
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_HEARTBEAT_TIMER, .pCont = pReq, .contLen = contLen, .info.noResp = 1};
34,134✔
161
  return tmsgPutToQueue(&pMnode->msgCb, ARB_QUEUE, &rpcMsg);
34,134✔
162
}
163

164
static int32_t mndPullupArbCheckSync(SMnode *pMnode) {
23,187✔
165
  mTrace("pullup arb sync");
23,187✔
166
  int32_t contLen = 0;
23,187✔
167
  void   *pReq = mndBuildTimerMsg(&contLen);
23,187✔
168
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_CHECK_SYNC_TIMER, .pCont = pReq, .contLen = contLen, .info.noResp = 1};
23,187✔
169
  return tmsgPutToQueue(&pMnode->msgCb, ARB_QUEUE, &rpcMsg);
23,187✔
170
}
171

172
static void mndCalMqRebalance(SMnode *pMnode) {
34,133✔
173
  int32_t contLen = 0;
34,133✔
174
  void   *pReq = mndBuildTimerMsg(&contLen);
34,133✔
175
  if (pReq != NULL) {
34,133!
176
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
34,133✔
177
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
34,133!
178
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
179
    }
180
  }
181
}
34,133✔
182

183
static void mndStreamCheckpointTimer(SMnode *pMnode) {
1,860✔
184
  SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
1,860✔
185
  if (pMsg != NULL) {
1,860!
186
    int32_t size = sizeof(SMStreamDoCheckpointMsg);
1,860✔
187
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
1,860✔
188
    // TODO check return value
189
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
1,860!
190
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
191
    }
192
  }
193
}
1,860✔
194

195
static void mndStreamCheckNode(SMnode *pMnode) {
2,961✔
196
  int32_t contLen = 0;
2,961✔
197
  void   *pReq = mndBuildTimerMsg(&contLen);
2,961✔
198
  if (pReq != NULL) {
2,961!
199
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen};
2,961✔
200
    // TODO check return value
201
    if (tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg) < 0) {
2,961!
202
      mError("failed to put into read-queue since %s, line:%d", terrstr(), __LINE__);
×
203
    }
204
  }
205
}
2,961✔
206

207
static void mndStreamCheckStatus(SMnode *pMnode) {
11✔
208
  int32_t contLen = 0;
11✔
209
  void   *pReq = mndBuildTimerMsg(&contLen);
11✔
210
  if (pReq != NULL) {
11!
211
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_CHECK_STREAM_TIMER, .pCont = pReq, .contLen = contLen};
11✔
212
    // TODO check return value
213
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
11!
214
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
215
    }
216
  }
217
}
11✔
218

219
static void mndStreamConsensusChkpt(SMnode *pMnode) {
13,589✔
220
  int32_t contLen = 0;
13,589✔
221
  void   *pReq = mndBuildTimerMsg(&contLen);
13,589✔
222
  if (pReq != NULL) {
13,589!
223
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CONSEN_TIMER, .pCont = pReq, .contLen = contLen};
13,589✔
224
    // TODO check return value
225
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
13,589!
226
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
227
    }
228
  }
229
}
13,589✔
230

231
static void mndPullupTelem(SMnode *pMnode) {
2✔
232
  mTrace("pullup telem msg");
2!
233
  int32_t contLen = 0;
2✔
234
  void   *pReq = mndBuildTimerMsg(&contLen);
2✔
235
  if (pReq != NULL) {
2!
236
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
2✔
237
    // TODO check return value
238
    if (tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg) < 0) {
2!
239
      mError("failed to put into read-queue since %s, line:%d", terrstr(), __LINE__);
×
240
    }
241
  }
242
}
2✔
243

244
static void mndPullupGrant(SMnode *pMnode) {
6,073✔
245
  mTrace("pullup grant msg");
6,073✔
246
  int32_t contLen = 0;
6,073✔
247
  void   *pReq = mndBuildTimerMsg(&contLen);
6,073✔
248
  if (pReq != NULL) {
6,073!
249
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_GRANT_HB_TIMER,
6,073✔
250
                      .pCont = pReq,
251
                      .contLen = contLen,
252
                      .info.notFreeAhandle = 1,
253
                      .info.ahandle = 0};
254
    // TODO check return value
255
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
6,073!
256
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
257
    }
258
  }
259
}
6,073✔
260

261
static void mndIncreaseUpTime(SMnode *pMnode) {
100✔
262
  mTrace("increate uptime");
100✔
263
  int32_t contLen = 0;
100✔
264
  void   *pReq = mndBuildTimerMsg(&contLen);
100✔
265
  if (pReq != NULL) {
100!
266
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPTIME_TIMER,
100✔
267
                      .pCont = pReq,
268
                      .contLen = contLen,
269
                      .info.notFreeAhandle = 1,
270
                      .info.ahandle = 0};
271
    // TODO check return value
272
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
100!
273
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
274
    }
275
  }
276
}
100✔
277

278
static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) {
223✔
279
  SSdb *pSdb = pMnode->pSdb;
223✔
280

281
  void *pIter = NULL;
223✔
282
  while (1) {
881✔
283
    SVgObj *pVgroup = NULL;
1,104✔
284
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
1,104✔
285
    if (pIter == NULL) break;
1,104✔
286

287
    bool stateChanged = false;
881✔
288
    for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
2,008✔
289
      SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
1,395✔
290
      if (pGid->dnodeId == dnodeId) {
1,395✔
291
        if (pGid->syncState != TAOS_SYNC_STATE_OFFLINE) {
268✔
292
          mInfo(
151!
293
              "vgId:%d, state changed by offline check, old state:%s restored:%d canRead:%d new state:error restored:0 "
294
              "canRead:0",
295
              pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead);
296
          pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
151✔
297
          pGid->syncRestore = 0;
151✔
298
          pGid->syncCanRead = 0;
151✔
299
          pGid->startTimeMs = 0;
151✔
300
          stateChanged = true;
151✔
301
        }
302
        break;
268✔
303
      }
304
    }
305

306
    if (stateChanged) {
881✔
307
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
151✔
308
      if (pDb != NULL && pDb->stateTs != curMs) {
151!
309
        mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
106!
310
              curMs);
311
        pDb->stateTs = curMs;
106✔
312
      }
313
      mndReleaseDb(pMnode, pDb);
151✔
314
    }
315

316
    sdbRelease(pSdb, pVgroup);
881✔
317
  }
318
}
223✔
319

320
static void mndCheckDnodeOffline(SMnode *pMnode) {
13,898✔
321
  mTrace("check dnode offline");
13,898✔
322
  if (mndAcquireRpc(pMnode) != 0) return;
13,898✔
323

324
  SSdb   *pSdb = pMnode->pSdb;
13,193✔
325
  int64_t curMs = taosGetTimestampMs();
13,193✔
326

327
  void *pIter = NULL;
13,193✔
328
  while (1) {
21,431✔
329
    SDnodeObj *pDnode = NULL;
34,624✔
330
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
34,624✔
331
    if (pIter == NULL) break;
34,624✔
332

333
    bool online = mndIsDnodeOnline(pDnode, curMs);
21,431✔
334
    if (!online) {
21,431✔
335
      mInfo("dnode:%d, in offline state", pDnode->id);
223!
336
      mndSetVgroupOffline(pMnode, pDnode->id, curMs);
223✔
337
    }
338

339
    sdbRelease(pSdb, pDnode);
21,431✔
340
  }
341

342
  mndReleaseRpc(pMnode);
13,193✔
343
}
344

345
static bool mnodeIsNotLeader(SMnode *pMnode) {
36,054✔
346
  terrno = 0;
36,054✔
347
  (void)taosThreadRwlockRdlock(&pMnode->lock);
36,054✔
348
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
36,054✔
349
  if (terrno != 0) {
36,054!
350
    (void)taosThreadRwlockUnlock(&pMnode->lock);
×
351
    return true;
×
352
  }
353

354
  if (state.state != TAOS_SYNC_STATE_LEADER) {
36,054✔
355
    (void)taosThreadRwlockUnlock(&pMnode->lock);
1,903✔
356
    terrno = TSDB_CODE_SYN_NOT_LEADER;
1,903✔
357
    return true;
1,903✔
358
  }
359
  if (!state.restored || !pMnode->restored) {
34,151✔
360
    (void)taosThreadRwlockUnlock(&pMnode->lock);
17✔
361
    terrno = TSDB_CODE_SYN_RESTORING;
17✔
362
    return true;
17✔
363
  }
364
  (void)taosThreadRwlockUnlock(&pMnode->lock);
34,134✔
365
  return false;
34,134✔
366
}
367

368
static int32_t minCronTime() {
72,928✔
369
  int32_t min = INT32_MAX;
72,928✔
370
  min = TMIN(min, tsTtlPushIntervalSec);
72,928✔
371
  min = TMIN(min, tsTrimVDbIntervalSec);
72,928✔
372
  min = TMIN(min, tsS3MigrateIntervalSec);
72,928✔
373
  min = TMIN(min, tsTransPullupInterval);
72,928✔
374
  min = TMIN(min, tsCompactPullupInterval);
72,928✔
375
  min = TMIN(min, tsMqRebalanceInterval);
72,928✔
376
  min = TMIN(min, tsStreamCheckpointInterval);
72,928✔
377
  min = TMIN(min, tsStreamNodeCheckInterval);
72,928✔
378
  min = TMIN(min, tsArbHeartBeatIntervalSec);
72,928✔
379
  min = TMIN(min, tsArbCheckSyncIntervalSec);
72,928✔
380

381
  int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
72,928✔
382
  min = TMIN(min, telemInt);
72,928✔
383
  min = TMIN(min, tsGrantHBInterval);
72,928✔
384
  min = TMIN(min, tsUptimeInterval);
72,928✔
385

386
  return min <= 1 ? 2 : min;
72,928✔
387
}
388
void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
71,008✔
389
  int32_t code = 0;
71,008✔
390
#ifndef TD_ASTRA  
391
  if (sec % tsTtlPushIntervalSec == 0) {
71,008✔
392
    mndPullupTtl(pMnode);
6,875✔
393
  }
394

395
  if (sec % tsTrimVDbIntervalSec == 0) {
71,008✔
396
    mndPullupTrimDb(pMnode);
5✔
397
  }
398
#endif
399
#ifdef USE_S3
400
  if (tsS3MigrateEnabled && sec % tsS3MigrateIntervalSec == 0) {
71,008!
401
    mndPullupS3MigrateDb(pMnode);
×
402
  }
403
#endif
404
  if (sec % tsTransPullupInterval == 0) {
71,008✔
405
    mndPullupTrans(pMnode);
34,939✔
406
  }
407

408
  if (sec % tsCompactPullupInterval == 0) {
71,008✔
409
    mndPullupCompacts(pMnode);
6,346✔
410
  }
411
#ifdef USE_TOPIC
412
  if (sec % tsMqRebalanceInterval == 0) {
71,008✔
413
    mndCalMqRebalance(pMnode);
34,133✔
414
  }
415
#endif
416
#ifdef USE_STREAM
417
  if (sec % 30 == 0) {  // send the checkpoint info every 30 sec
71,008✔
418
    mndStreamCheckpointTimer(pMnode);
1,860✔
419
  }
420

421
  if (sec % tsStreamNodeCheckInterval == 0) {
71,008✔
422
    mndStreamCheckNode(pMnode);
2,961✔
423
  }
424

425
  if (sec % (tsStreamFailedTimeout/1000) == 0) {
71,008✔
426
    mndStreamCheckStatus(pMnode);
11✔
427
  }
428

429
  if (sec % 5 == 0) {
71,008✔
430
    mndStreamConsensusChkpt(pMnode);
13,589✔
431
  }
432
#endif
433
#ifdef USE_REPORT
434
  if (sec % tsTelemInterval == (TMIN(86400, (tsTelemInterval - 1)))) {
71,008✔
435
    mndPullupTelem(pMnode);
2✔
436
  }
437
#endif
438
#ifndef TD_ASTRA
439
  if (sec % tsGrantHBInterval == 0) {
71,008✔
440
    mndPullupGrant(pMnode);
6,073✔
441
  }
442
#endif
443
  if (sec % tsUptimeInterval == 0) {
71,008✔
444
    mndIncreaseUpTime(pMnode);
100✔
445
  }
446
#ifndef TD_ASTRA
447
  if (sec % (tsArbHeartBeatIntervalSec) == 0) {
71,008✔
448
    if ((code = mndPullupArbHeartbeat(pMnode)) != 0) {
34,134!
449
      mError("failed to pullup arb heartbeat, since:%s", tstrerror(code));
×
450
    }
451
  }
452

453
  if (sec % (tsArbCheckSyncIntervalSec) == 0) {
71,008✔
454
    if ((code = mndPullupArbCheckSync(pMnode)) != 0) {
23,187!
455
      mError("failed to pullup arb check sync, since:%s", tstrerror(code));
×
456
    }
457
  }
458
#endif
459
}
71,008✔
460
void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) {
72,928✔
461
  if (sec % (tsStatusInterval * 5) == 0) {
72,928✔
462
    mndCheckDnodeOffline(pMnode);
13,898✔
463
  }
464
  if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) {
72,928✔
465
    mndSyncCheckTimeout(pMnode);
1,920✔
466
  }
467
}
72,928✔
468

469
static void *mndThreadFp(void *param) {
1,747✔
470
  SMnode *pMnode = param;
1,747✔
471
  int64_t lastTime = 0;
1,747✔
472
  setThreadName("mnode-timer");
1,747✔
473

474
  while (1) {
737,247✔
475
    lastTime++;
738,994✔
476
    taosMsleep(100);
738,994✔
477
    if (mndGetStop(pMnode)) break;
738,994✔
478
    if (lastTime % 10 != 0) continue;
737,247✔
479

480
    int64_t sec = lastTime / 10;
72,928✔
481
    mndDoTimerCheckTask(pMnode, sec);
72,928✔
482

483
    int64_t minCron = minCronTime();
72,928✔
484
    if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) {
72,928✔
485
      // not leader, do nothing
486
      mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno));
1,920!
487
      terrno = 0;
1,920✔
488
      continue;
1,920✔
489
    }
490
    mndDoTimerPullupTask(pMnode, sec);
71,008✔
491
  }
492

493
  return NULL;
1,747✔
494
}
495

496
static int32_t mndInitTimer(SMnode *pMnode) {
1,747✔
497
  int32_t      code = 0;
1,747✔
498
  TdThreadAttr thAttr;
499
  (void)taosThreadAttrInit(&thAttr);
1,747✔
500
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,747✔
501
#ifdef TD_COMPACT_OS
502
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
503
#endif
504
  if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) {
1,747!
505
    mError("failed to create timer thread since %s", tstrerror(code));
×
506
    TAOS_RETURN(code);
×
507
  }
508

509
  (void)taosThreadAttrDestroy(&thAttr);
1,747✔
510
  tmsgReportStartup("mnode-timer", "initialized");
1,747✔
511
  TAOS_RETURN(code);
1,747✔
512
}
513

514
static void mndCleanupTimer(SMnode *pMnode) {
1,747✔
515
  if (taosCheckPthreadValid(pMnode->thread)) {
1,747!
516
    (void)taosThreadJoin(pMnode->thread, NULL);
1,747✔
517
    taosThreadClear(&pMnode->thread);
1,747✔
518
  }
519
}
1,747✔
520

521
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
1,748✔
522
  int32_t code = 0;
1,748✔
523
  pMnode->path = taosStrdup(path);
1,748!
524
  if (pMnode->path == NULL) {
1,748!
525
    code = terrno;
×
526
    TAOS_RETURN(code);
×
527
  }
528

529
  if (taosMkDir(pMnode->path) != 0) {
1,748!
530
    code = terrno;
×
531
    TAOS_RETURN(code);
×
532
  }
533

534
  TAOS_RETURN(code);
1,748✔
535
}
536

537
static int32_t mndInitWal(SMnode *pMnode) {
1,748✔
538
  int32_t code = 0;
1,748✔
539
  char    path[PATH_MAX + 20] = {0};
1,748✔
540
  (void)snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
1,748✔
541
  SWalCfg cfg = {.vgId = 1,
1,748✔
542
                 .fsyncPeriod = 0,
543
                 .rollPeriod = -1,
544
                 .segSize = -1,
545
                 .committed = -1,
546
                 .retentionPeriod = 0,
547
                 .retentionSize = 0,
548
                 .level = TAOS_WAL_FSYNC,
549
                 .encryptAlgorithm = 0,
550
                 .encryptKey = {0}};
551

552
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
553
  if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_MNODE_WAL) == DND_CS_MNODE_WAL) {
1,748!
554
    cfg.encryptAlgorithm = (tsiEncryptScope & DND_CS_MNODE_WAL) ? tsiEncryptAlgorithm : 0;
×
555
    if (tsEncryptKey[0] == '\0') {
×
556
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
557
      TAOS_RETURN(code);
×
558
    } else {
559
      tstrncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
560
    }
561
  }
562
#endif
563

564
  pMnode->pWal = walOpen(path, &cfg);
1,748✔
565
  if (pMnode->pWal == NULL) {
1,748!
566
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
567
    if (terrno != 0) code = terrno;
×
568
    mError("failed to open wal since %s. wal:%s", tstrerror(code), path);
×
569
    TAOS_RETURN(code);
×
570
  }
571

572
  TAOS_RETURN(code);
1,748✔
573
}
574

575
static void mndCloseWal(SMnode *pMnode) {
1,747✔
576
  if (pMnode->pWal != NULL) {
1,747!
577
    walClose(pMnode->pWal);
1,747✔
578
    pMnode->pWal = NULL;
1,747✔
579
  }
580
}
1,747✔
581

582
static int32_t mndInitSdb(SMnode *pMnode) {
1,748✔
583
  int32_t code = 0;
1,748✔
584
  SSdbOpt opt = {0};
1,748✔
585
  opt.path = pMnode->path;
1,748✔
586
  opt.pMnode = pMnode;
1,748✔
587
  opt.pWal = pMnode->pWal;
1,748✔
588

589
  pMnode->pSdb = sdbInit(&opt);
1,748✔
590
  if (pMnode->pSdb == NULL) {
1,748!
591
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
592
    if (terrno != 0) code = terrno;
×
593
    TAOS_RETURN(code);
×
594
  }
595

596
  TAOS_RETURN(code);
1,748✔
597
}
598

599
static int32_t mndOpenSdb(SMnode *pMnode) {
1,748✔
600
  int32_t code = 0;
1,748✔
601
  if (!pMnode->deploy) {
1,748✔
602
    code = sdbReadFile(pMnode->pSdb);
487✔
603
  }
604

605
  mInfo("vgId:1, mnode sdb is opened, with applied index:%" PRId64, pMnode->pSdb->commitIndex);
1,748!
606

607
  atomic_store_64(&pMnode->applied, pMnode->pSdb->commitIndex);
1,748✔
608
  return code;
1,748✔
609
}
610

611
static void mndCleanupSdb(SMnode *pMnode) {
1,747✔
612
  if (pMnode->pSdb) {
1,747!
613
    sdbCleanup(pMnode->pSdb);
1,747✔
614
    pMnode->pSdb = NULL;
1,747✔
615
  }
616
}
1,747✔
617

618
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
62,928✔
619
  SMnodeStep step = {0};
62,928✔
620
  step.name = name;
62,928✔
621
  step.initFp = initFp;
62,928✔
622
  step.cleanupFp = cleanupFp;
62,928✔
623
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
125,856!
624
    TAOS_RETURN(terrno);
×
625
  }
626

627
  TAOS_RETURN(0);
62,928✔
628
}
629

630
static int32_t mndInitSteps(SMnode *pMnode) {
1,748✔
631
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal));
1,748!
632
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb));
1,748!
633
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans));
1,748!
634
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster));
1,748!
635
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode));
1,748!
636
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode));
1,748!
637
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode));
1,748!
638
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-anode", mndInitAnode, mndCleanupAnode));
1,748!
639
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-arbgroup", mndInitArbGroup, mndCleanupArbGroup));
1,748!
640
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-config", mndInitConfig, NULL));
1,748!
641
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode));
1,748!
642
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser));
1,748!
643
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant));
1,748!
644
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege));
1,748!
645
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct));
1,748!
646
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream));
1,748!
647
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic));
1,748!
648
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer));
1,748!
649
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe));
1,748!
650
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup));
1,748!
651
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb));
1,748!
652
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma));
1,748!
653
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-idx", mndInitIdx, mndCleanupIdx));
1,748!
654
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos));
1,748!
655
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs));
1,748!
656
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb));
1,748!
657
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc));
1,748!
658
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-view", mndInitView, mndCleanupView));
1,748!
659
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-compact", mndInitCompact, mndCleanupCompact));
1,748!
660
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-compact-detail", mndInitCompactDetail, mndCleanupCompactDetail));
1,748!
661
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL));
1,748!
662
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile));
1,748!
663
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow));
1,748!
664
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery));
1,748!
665
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync));
1,748!
666
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem));
1,748!
667
  return 0;
1,748✔
668
}
669

670
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
1,747✔
671
  if (pMnode->pSteps == NULL) return;
1,747!
672

673
  if (pos == -1) {
1,747!
674
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
1,747✔
675
  }
676

677
  for (int32_t s = pos; s >= 0; s--) {
64,639✔
678
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
62,892✔
679
    mInfo("%s will cleanup", pStep->name);
62,892!
680
    if (pStep->cleanupFp != NULL) {
62,892✔
681
      (*pStep->cleanupFp)(pMnode);
59,398✔
682
    }
683
  }
684

685
  taosArrayClear(pMnode->pSteps);
1,747✔
686
  taosArrayDestroy(pMnode->pSteps);
1,747✔
687
  pMnode->pSteps = NULL;
1,747✔
688
}
689

690
static int32_t mndExecSteps(SMnode *pMnode) {
1,748✔
691
  int32_t code = 0;
1,748✔
692
  int32_t size = taosArrayGetSize(pMnode->pSteps);
1,748✔
693
  for (int32_t pos = 0; pos < size; pos++) {
64,676✔
694
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
62,928✔
695
    if (pStep->initFp == NULL) continue;
62,928!
696

697
    if ((code = (*pStep->initFp)(pMnode)) != 0) {
62,928!
698
      mError("%s exec failed since %s, start to cleanup", pStep->name, tstrerror(code));
×
699
      mndCleanupSteps(pMnode, pos);
×
700
      TAOS_RETURN(code);
×
701
    } else {
702
      mInfo("%s is initialized", pStep->name);
62,928!
703
      tmsgReportStartup(pStep->name, "initialized");
62,928✔
704
    }
705
  }
706

707
  pMnode->clusterId = mndGetClusterId(pMnode);
1,748✔
708
  TAOS_RETURN(0);
1,748✔
709
}
710

711
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
1,748✔
712
  pMnode->msgCb = pOption->msgCb;
1,748✔
713
  pMnode->selfDnodeId = pOption->dnodeId;
1,748✔
714
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
1,748✔
715
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
1,748✔
716
  pMnode->syncMgmt.numOfTotalReplicas = pOption->numOfTotalReplicas;
1,748✔
717
  pMnode->syncMgmt.lastIndex = pOption->lastIndex;
1,748✔
718
  (void)memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
1,748✔
719
  (void)memcpy(pMnode->syncMgmt.nodeRoles, pOption->nodeRoles, sizeof(pOption->nodeRoles));
1,748✔
720
}
1,748✔
721

722
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
1,748✔
723
  terrno = 0;
1,748✔
724
  mInfo("start to open mnode in %s", path);
1,748!
725

726
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
1,748!
727
  if (pMnode == NULL) {
1,748!
728
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
729
    mError("failed to open mnode since %s", terrstr());
×
730
    return NULL;
×
731
  }
732
  (void)memset(pMnode, 0, sizeof(SMnode));
1,748✔
733

734
  int32_t code = taosThreadRwlockInit(&pMnode->lock, NULL);
1,748✔
735
  if (code != 0) {
1,748!
736
    taosMemoryFree(pMnode);
×
737
    mError("failed to open mnode lock since %s", tstrerror(code));
×
738
    return NULL;
×
739
  }
740

741
  char timestr[24] = "1970-01-01 00:00:00.00";
1,748✔
742
  code = taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL);
1,748✔
743
  if (code < 0) {
1,748!
744
    mError("failed to parse time since %s", tstrerror(code));
×
745
    (void)taosThreadRwlockDestroy(&pMnode->lock);
×
746
    taosMemoryFree(pMnode);
×
747
    return NULL;
×
748
  }
749
  mndSetOptions(pMnode, pOption);
1,748✔
750

751
  pMnode->deploy = pOption->deploy;
1,748✔
752
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
1,748✔
753
  if (pMnode->pSteps == NULL) {
1,748!
754
    taosMemoryFree(pMnode);
×
755
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
756
    mError("failed to open mnode since %s", terrstr());
×
757
    return NULL;
×
758
  }
759

760
  code = mndCreateDir(pMnode, path);
1,748✔
761
  if (code != 0) {
1,748!
762
    code = terrno;
×
763
    mError("failed to open mnode since %s", tstrerror(code));
×
764
    mndClose(pMnode);
×
765
    terrno = code;
×
766
    return NULL;
×
767
  }
768

769
  code = mndInitSteps(pMnode);
1,748✔
770
  if (code != 0) {
1,748!
771
    code = terrno;
×
772
    mError("failed to open mnode since %s", tstrerror(code));
×
773
    mndClose(pMnode);
×
774
    terrno = code;
×
775
    return NULL;
×
776
  }
777

778
  code = mndExecSteps(pMnode);
1,748✔
779
  if (code != 0) {
1,748!
780
    code = terrno;
×
781
    mError("failed to open mnode since %s", tstrerror(code));
×
782
    mndClose(pMnode);
×
783
    terrno = code;
×
784
    return NULL;
×
785
  }
786

787
  mInfo("mnode open successfully");
1,748!
788
  return pMnode;
1,748✔
789
}
790

791
void mndPreClose(SMnode *pMnode) {
1,747✔
792
  if (pMnode != NULL) {
1,747!
793
    int32_t code = 0;
1,747✔
794
    // TODO check return value
795
    code = syncLeaderTransfer(pMnode->syncMgmt.sync);
1,747✔
796
    if (code < 0) {
1,747!
UNCOV
797
      mError("failed to transfer leader since %s", tstrerror(code));
×
798
    }
799
    syncPreStop(pMnode->syncMgmt.sync);
1,747✔
800
    code = sdbWriteFile(pMnode->pSdb, 0);
1,747✔
801
    if (code < 0) {
1,747!
802
      mError("failed to write sdb since %s", tstrerror(code));
×
803
    }
804
  }
805
}
1,747✔
806

807
void mndClose(SMnode *pMnode) {
1,747✔
808
  if (pMnode != NULL) {
1,747!
809
    mInfo("start to close mnode");
1,747!
810
    mndCleanupSteps(pMnode, -1);
1,747✔
811
    taosMemoryFreeClear(pMnode->path);
1,747!
812
    taosMemoryFreeClear(pMnode);
1,747!
813
    mInfo("mnode is closed");
1,747!
814
  }
815
}
1,747✔
816

817
int32_t mndStart(SMnode *pMnode) {
1,747✔
818
  mndSyncStart(pMnode);
1,747✔
819
  if (pMnode->deploy) {
1,747✔
820
    if (sdbDeploy(pMnode->pSdb) != 0) {
1,261!
821
      mError("failed to deploy sdb while start mnode");
×
822
      return -1;
×
823
    }
824
    mndSetRestored(pMnode, true);
1,261✔
825
  }
826
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
1,747✔
827

828
  return mndInitTimer(pMnode);
1,747✔
829
}
830

831
int32_t mndIsCatchUp(SMnode *pMnode) {
622✔
832
  int64_t rid = pMnode->syncMgmt.sync;
622✔
833
  return syncIsCatchUp(rid);
622✔
834
}
835

836
ESyncRole mndGetRole(SMnode *pMnode) {
622✔
837
  int64_t rid = pMnode->syncMgmt.sync;
622✔
838
  return syncGetRole(rid);
622✔
839
}
840

841
int64_t mndGetTerm(SMnode *pMnode) {
22,584✔
842
  int64_t rid = pMnode->syncMgmt.sync;
22,584✔
843
  return syncGetTerm(rid);
22,584✔
844
}
845

846
int32_t mndGetArbToken(SMnode *pMnode, char *outToken) { return syncGetArbToken(pMnode->syncMgmt.sync, outToken); }
56,721✔
847

848
void mndStop(SMnode *pMnode) {
1,747✔
849
  mndSetStop(pMnode);
1,747✔
850
  mndSyncStop(pMnode);
1,747✔
851
  mndCleanupTimer(pMnode);
1,747✔
852
}
1,747✔
853

854
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
169,412✔
855
  SMnode    *pMnode = pMsg->info.node;
169,412✔
856
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
169,412✔
857

858
  const STraceId *trace = &pMsg->info.traceId;
169,412✔
859
  mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
169,412!
860

861
  int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
169,412✔
862
  if (code != 0) {
169,412✔
863
    mGError("vgId:1, failed to process sync msg:%p type:%s, reason: %s, code:0x%x", pMsg, TMSG_INFO(pMsg->msgType),
3!
864
            tstrerror(code), code);
865
  }
866

867
  return code;
169,412✔
868
}
869

870
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
4,813,131✔
871
  int32_t code = 0;
4,813,131✔
872
  if (!IsReq(pMsg)) TAOS_RETURN(code);
4,813,131✔
873
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
4,601,181✔
874
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
4,360,024!
875
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK ||
4,216,761✔
876
      pMsg->msgType == TDMT_SCH_TASK_NOTIFY) {
3,734,683✔
877
    TAOS_RETURN(code);
866,510✔
878
  }
879

880
  SMnode *pMnode = pMsg->info.node;
3,734,671✔
881
  (void)taosThreadRwlockRdlock(&pMnode->lock);
3,734,671✔
882
  if (pMnode->stopped) {
3,735,832✔
883
    (void)taosThreadRwlockUnlock(&pMnode->lock);
92✔
884
    code = TSDB_CODE_APP_IS_STOPPING;
92✔
885
    TAOS_RETURN(code);
92✔
886
  }
887

888
  terrno = 0;
3,735,740✔
889
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
3,735,215✔
890
  if (terrno != 0) {
3,735,760!
891
    (void)taosThreadRwlockUnlock(&pMnode->lock);
×
892
    code = terrno;
×
893
    TAOS_RETURN(code);
×
894
  }
895

896
  if (state.state != TAOS_SYNC_STATE_LEADER) {
3,735,737✔
897
    (void)taosThreadRwlockUnlock(&pMnode->lock);
29,028✔
898
    code = TSDB_CODE_SYN_NOT_LEADER;
29,030✔
899
    goto _OVER;
29,030✔
900
  }
901

902
  if (!state.restored || !pMnode->restored) {
3,706,709✔
903
    (void)taosThreadRwlockUnlock(&pMnode->lock);
1,254✔
904
    code = TSDB_CODE_SYN_RESTORING;
1,261✔
905
    goto _OVER;
1,261✔
906
  }
907

908
#if 1
909
  (void)atomic_add_fetch_32(&pMnode->rpcRef, 1);
3,705,455✔
910
#else
911
  int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
912
  mTrace("mnode rpc is acquired, ref:%d", ref);
913
#endif
914

915
  (void)taosThreadRwlockUnlock(&pMnode->lock);
3,705,504✔
916
  TAOS_RETURN(code);
3,705,486✔
917

918
_OVER:
30,291✔
919
  if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
30,291!
920
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
30,287!
921
      pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER ||
30,252!
922
      pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER ||
30,251!
923
      pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT ||
30,252✔
924
      pMsg->msgType == TDMT_MND_S3MIGRATE_DB_TIMER || pMsg->msgType == TDMT_MND_ARB_HEARTBEAT_TIMER ||
28,222!
925
      pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER || pMsg->msgType == TDMT_MND_CHECK_STREAM_TIMER) {
28,223!
926
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
2,740!
927
           pMnode->stopped, state.restored, syncStr(state.state));
928
    TAOS_RETURN(code);
2,740✔
929
  }
930

931
  const STraceId *trace = &pMsg->info.traceId;
27,551✔
932
  SEpSet          epSet = {0};
27,551✔
933
  mndGetMnodeEpSet(pMnode, &epSet);
27,551✔
934

935
  mGDebug(
27,551!
936
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
937
      "role:%s, redirect numOfEps:%d inUse:%d, type:%s",
938
      pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code), pMnode->restored, pMnode->stopped, state.restored,
939
      syncStr(state.state), epSet.numOfEps, epSet.inUse, TMSG_INFO(pMsg->msgType));
940

941
  if (epSet.numOfEps <= 0) return -1;
27,551!
942

943
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
108,771✔
944
    mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
81,220✔
945
  }
946

947
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
27,551✔
948
  pMsg->info.rsp = rpcMallocCont(contLen);
27,551✔
949
  if (pMsg->info.rsp != NULL) {
27,550!
950
    if (tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet) < 0) {
27,550!
951
      mError("failed to serialize ep set");
×
952
    }
953
    pMsg->info.hasEpSet = 1;
27,551✔
954
    pMsg->info.rspLen = contLen;
27,551✔
955
  }
956

957
  TAOS_RETURN(code);
27,551✔
958
}
959

960
int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo *pQueueInfo) {
4,813,763✔
961
  SMnode         *pMnode = pMsg->info.node;
4,813,763✔
962
  const STraceId *trace = &pMsg->info.traceId;
4,813,763✔
963
  int32_t         code = TSDB_CODE_SUCCESS;
4,813,763✔
964

965
  MndMsgFp    fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
4,813,763✔
966
  MndMsgFpExt fpExt = NULL;
4,813,763✔
967
  if (fp == NULL) {
4,813,763✔
968
    fpExt = pMnode->msgFpExt[TMSG_INDEX(pMsg->msgType)];
866,987✔
969
    if (fpExt == NULL) {
866,987✔
970
      mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
10!
971
      code = TSDB_CODE_MSG_NOT_PROCESSED;
10✔
972
      TAOS_RETURN(code);
10✔
973
    }
974
  }
975

976
  TAOS_CHECK_RETURN(mndCheckMnodeState(pMsg));
4,813,753✔
977

978
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
4,784,068!
979
  if (fp)
4,784,068✔
980
    code = (*fp)(pMsg);
3,917,202✔
981
  else
982
    code = (*fpExt)(pMsg, pQueueInfo);
866,866✔
983
  mndReleaseRpc(pMnode);
4,783,966✔
984

985
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
4,784,439✔
986
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
894,757!
987
  } else if (code == 0) {
3,889,682✔
988
    mGTrace("msg:%p, successfully processed", pMsg);
3,882,630!
989
  } else {
990
    // TODO removve this wrong set code
991
    if (code == -1) {
7,052✔
992
      code = terrno;
6✔
993
    }
994
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, tstrerror(code), pMsg->info.ahandle,
7,052!
995
            TMSG_INFO(pMsg->msgType));
996
  }
997

998
  TAOS_RETURN(code);
4,784,439✔
999
}
1000

1001
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
312,892✔
1002
  tmsg_t type = TMSG_INDEX(msgType);
312,892✔
1003
  if (type < TDMT_MAX) {
312,892!
1004
    pMnode->msgFp[type] = fp;
312,892✔
1005
  }
1006
}
312,892✔
1007

1008
void mndSetMsgHandleExt(SMnode *pMnode, tmsg_t msgType, MndMsgFpExt fp) {
13,984✔
1009
  tmsg_t type = TMSG_INDEX(msgType);
13,984✔
1010
  if (type < TDMT_MAX) {
13,984!
1011
    pMnode->msgFpExt[type] = fp;
13,984✔
1012
  }
1013
}
13,984✔
1014

1015
// Note: uid 0 is reserved
1016
int64_t mndGenerateUid(const char *name, int32_t len) {
127,131✔
1017
  int32_t hashval = MurmurHash3_32(name, len);
127,131✔
1018
  do {
×
1019
    int64_t us = taosGetTimestampUs();
127,131✔
1020
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
127,131✔
1021
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
127,131✔
1022
    if (uuid) {
127,131!
1023
      return llabs(uuid);
127,131✔
1024
    }
1025
  } while (true);
1026
}
1027

1028
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
13✔
1029
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
1030
  int32_t code = mndAcquireRpc(pMnode);
13✔
1031
  if (code < 0) {
13!
1032
    TAOS_RETURN(code);
×
1033
  } else if (code == 1) {
13!
1034
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1035
  }
1036

1037
  SSdb   *pSdb = pMnode->pSdb;
13✔
1038
  int64_t ms = taosGetTimestampMs();
13✔
1039

1040
  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
13✔
1041
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
13✔
1042
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
13✔
1043
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
13✔
1044
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
13!
1045
      pStbInfo->stbs == NULL) {
13!
1046
    mndReleaseRpc(pMnode);
×
1047
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1048
    if (terrno != 0) code = terrno;
×
1049
    TAOS_RETURN(code);
×
1050
  }
1051

1052
  // cluster info
1053
  tstrncpy(pClusterInfo->version, td_version, sizeof(pClusterInfo->version));
13✔
1054
  pClusterInfo->monitor_interval = tsMonitorInterval;
13✔
1055
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
13✔
1056
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
13✔
1057
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
13✔
1058
  pClusterInfo->topics_toal = sdbGetSize(pSdb, SDB_TOPIC);
13✔
1059
  pClusterInfo->streams_total = sdbGetSize(pSdb, SDB_STREAM);
13✔
1060

1061
  void *pIter = NULL;
13✔
1062
  while (1) {
13✔
1063
    SDnodeObj *pObj = NULL;
26✔
1064
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
26✔
1065
    if (pIter == NULL) break;
26✔
1066

1067
    SMonDnodeDesc desc = {0};
13✔
1068
    desc.dnode_id = pObj->id;
13✔
1069
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
13✔
1070
    if (mndIsDnodeOnline(pObj, ms)) {
13✔
1071
      tstrncpy(desc.status, "ready", sizeof(desc.status));
12✔
1072
    } else {
1073
      tstrncpy(desc.status, "offline", sizeof(desc.status));
1✔
1074
    }
1075
    if (taosArrayPush(pClusterInfo->dnodes, &desc) == NULL) {
26!
1076
      mError("failed put dnode into array, but continue at this monitor report")
×
1077
    }
1078
    sdbRelease(pSdb, pObj);
13✔
1079
  }
1080

1081
  pIter = NULL;
13✔
1082
  while (1) {
13✔
1083
    SMnodeObj *pObj = NULL;
26✔
1084
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
26✔
1085
    if (pIter == NULL) break;
26✔
1086

1087
    SMonMnodeDesc desc = {0};
13✔
1088
    desc.mnode_id = pObj->id;
13✔
1089
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
13✔
1090

1091
    if (pObj->id == pMnode->selfDnodeId) {
13!
1092
      pClusterInfo->first_ep_dnode_id = pObj->id;
13✔
1093
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
13✔
1094
      // pClusterInfo->master_uptime = (float)mndGetClusterUpTime(pMnode) / 86400.0f;
1095
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
13✔
1096
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
1097
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
13✔
1098
      desc.syncState = TAOS_SYNC_STATE_LEADER;
13✔
1099
    } else {
1100
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
×
1101
      desc.syncState = pObj->syncState;
×
1102
    }
1103
    if (taosArrayPush(pClusterInfo->mnodes, &desc) == NULL) {
26!
1104
      mError("failed to put mnode into array, but continue at this monitor report");
×
1105
    }
1106
    sdbRelease(pSdb, pObj);
13✔
1107
  }
1108

1109
  // vgroup info
1110
  pIter = NULL;
13✔
1111
  while (1) {
32✔
1112
    SVgObj *pVgroup = NULL;
45✔
1113
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
45✔
1114
    if (pIter == NULL) break;
45✔
1115

1116
    pClusterInfo->vgroups_total++;
32✔
1117
    pClusterInfo->tbs_total += pVgroup->numOfTables;
32✔
1118

1119
    SMonVgroupDesc desc = {0};
32✔
1120
    desc.vgroup_id = pVgroup->vgId;
32✔
1121

1122
    SName name = {0};
32✔
1123
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
32✔
1124
    if (code < 0) {
32!
1125
      mError("failed to get db name since %s", tstrerror(code));
×
1126
      sdbRelease(pSdb, pVgroup);
×
1127
      TAOS_RETURN(code);
×
1128
    }
1129
    (void)tNameGetDbName(&name, desc.database_name);
32✔
1130

1131
    desc.tables_num = pVgroup->numOfTables;
32✔
1132
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
32✔
1133
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
32✔
1134
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
64✔
1135
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
32✔
1136
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
32✔
1137
      pVnDesc->dnode_id = pVgid->dnodeId;
32✔
1138
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
32✔
1139
      pVnDesc->syncState = pVgid->syncState;
32✔
1140
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
32!
1141
        tstrncpy(desc.status, "ready", sizeof(desc.status));
32✔
1142
        pClusterInfo->vgroups_alive++;
32✔
1143
      }
1144
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR && pVgid->syncState != TAOS_SYNC_STATE_OFFLINE) {
32!
1145
        pClusterInfo->vnodes_alive++;
32✔
1146
      }
1147
      pClusterInfo->vnodes_total++;
32✔
1148
    }
1149

1150
    if (taosArrayPush(pVgroupInfo->vgroups, &desc) == NULL) {
64!
1151
      mError("failed to put vgroup into array, but continue at this monitor report")
×
1152
    }
1153
    sdbRelease(pSdb, pVgroup);
32✔
1154
  }
1155

1156
  // stb info
1157
  pIter = NULL;
13✔
1158
  while (1) {
12✔
1159
    SStbObj *pStb = NULL;
25✔
1160
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
25✔
1161
    if (pIter == NULL) break;
25✔
1162

1163
    SMonStbDesc desc = {0};
12✔
1164

1165
    SName name1 = {0};
12✔
1166
    code = tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
12✔
1167
    if (code < 0) {
12!
1168
      mError("failed to get db name since %s", tstrerror(code));
×
1169
      sdbRelease(pSdb, pStb);
×
1170
      TAOS_RETURN(code);
×
1171
    }
1172
    (void)tNameGetDbName(&name1, desc.database_name);
12✔
1173

1174
    SName name2 = {0};
12✔
1175
    code = tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
12✔
1176
    if (code < 0) {
12!
1177
      mError("failed to get table name since %s", tstrerror(code));
×
1178
      sdbRelease(pSdb, pStb);
×
1179
      TAOS_RETURN(code);
×
1180
    }
1181
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);
12✔
1182

1183
    if (taosArrayPush(pStbInfo->stbs, &desc) == NULL) {
24!
1184
      mError("failed to put stb into array, but continue at this monitor report");
×
1185
    }
1186
    sdbRelease(pSdb, pStb);
12✔
1187
  }
1188

1189
  // grant info
1190
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000;
13✔
1191
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
13✔
1192
  if (pMnode->grant.expireTimeMS == 0) {
13!
1193
    pGrantInfo->expire_time = 0;
×
1194
    pGrantInfo->timeseries_total = 0;
×
1195
  }
1196

1197
  mndReleaseRpc(pMnode);
13✔
1198
  TAOS_RETURN(code);
13✔
1199
}
1200

1201
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
73,047✔
1202
  mTrace("mnode get load");
73,047✔
1203
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
73,047✔
1204
  pLoad->syncState = state.state;
73,047✔
1205
  pLoad->syncRestore = state.restored;
73,047✔
1206
  pLoad->syncTerm = state.term;
73,047✔
1207
  pLoad->roleTimeMs = state.roleTimeMs;
73,047✔
1208
  mTrace("mnode current syncState is %s, syncRestore:%d, syncTerm:%" PRId64 " ,roleTimeMs:%" PRId64,
73,047✔
1209
         syncStr(pLoad->syncState), pLoad->syncRestore, pLoad->syncTerm, pLoad->roleTimeMs);
1210
  return 0;
73,047✔
1211
}
1212

1213
int64_t mndGetRoleTimeMs(SMnode *pMnode) {
22,514✔
1214
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
22,514✔
1215
  return state.roleTimeMs;
22,514✔
1216
}
1217

1218
void mndSetRestored(SMnode *pMnode, bool restored) {
1,747✔
1219
  if (restored) {
1,747✔
1220
    (void)taosThreadRwlockWrlock(&pMnode->lock);
1,746✔
1221
    pMnode->restored = true;
1,746✔
1222
    (void)taosThreadRwlockUnlock(&pMnode->lock);
1,746✔
1223
    mInfo("mnode set restored:%d", restored);
1,746!
1224
  } else {
1225
    (void)taosThreadRwlockWrlock(&pMnode->lock);
1✔
1226
    pMnode->restored = false;
1✔
1227
    (void)taosThreadRwlockUnlock(&pMnode->lock);
1✔
1228
    mInfo("mnode set restored:%d", restored);
1!
1229
    while (1) {
1230
      if (pMnode->rpcRef <= 0) break;
1!
1231
      taosMsleep(3);
×
1232
    }
1233
  }
1234
}
1,747✔
1235

1236
bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }
×
1237

1238
void mndSetStop(SMnode *pMnode) {
1,747✔
1239
  (void)taosThreadRwlockWrlock(&pMnode->lock);
1,747✔
1240
  pMnode->stopped = true;
1,747✔
1241
  (void)taosThreadRwlockUnlock(&pMnode->lock);
1,747✔
1242
  mInfo("mnode set stopped");
1,747!
1243
}
1,747✔
1244

1245
bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }
738,994✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc