• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5048

10 May 2026 03:11AM UTC coverage: 73.222% (+0.07%) from 73.152%
#5048

push

travis-ci

web-flow
merge: from main to 3.0 branch #35290

353 of 452 new or added lines in 9 files covered. (78.1%)

587 existing lines in 140 files now uncovered.

278189 of 379928 relevant lines covered (73.22%)

135206397.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.17
/source/dnode/mnode/impl/src/mndMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndAcct.h"
18
#include "mndAnode.h"
19
#include "mndArbGroup.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndCompact.h"
23
#include "mndCompactDetail.h"
24
#include "mndConfig.h"
25
#include "mndConsumer.h"
26
#include "mndDb.h"
27
#include "mndDnode.h"
28
#include "mndEncryptAlgr.h"
29
#include "mndFunc.h"
30
#include "mndGrant.h"
31
#include "mndIndex.h"
32
#include "mndInfoSchema.h"
33
#include "mndInstance.h"
34
#include "mndMnode.h"
35
#include "mndMount.h"
36
#include "mndPerfSchema.h"
37
#include "mndPrivilege.h"
38
#include "mndProfile.h"
39
#include "mndQnode.h"
40
#include "mndQuery.h"
41
#include "mndRetention.h"
42
#include "mndRetentionDetail.h"
43
#include "mndRole.h"
44
#include "mndRsma.h"
45
#include "mndScan.h"
46
#include "mndScanDetail.h"
47
#include "mndSecurityPolicy.h"
48
#include "mndShow.h"
49
#include "mndSma.h"
50
#include "mndSnode.h"
51
#include "mndSsMigrate.h"
52
#include "mndStb.h"
53
#include "mndStream.h"
54
#include "mndSubscribe.h"
55
#include "mndSync.h"
56
#include "mndTelem.h"
57
#include "mndToken.h"
58
#include "mndTopic.h"
59
#include "mndTrans.h"
60
#include "mndUser.h"
61
#include "mndVgroup.h"
62
#include "mndView.h"
63
#include "mndXnode.h"
64
#include "tencrypt.h"
65

66
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
6,829,594✔
67
  int32_t code = 0;
6,829,594✔
68
  (void)taosThreadRwlockRdlock(&pMnode->lock);
6,829,594✔
69
  if (pMnode->stopped) {
6,829,594✔
70
    code = TSDB_CODE_APP_IS_STOPPING;
×
71
  } else if (!mndIsLeader(pMnode)) {
6,829,594✔
UNCOV
72
    code = 1;
×
73
  } else {
74
#if 1
75
    (void)atomic_add_fetch_32(&pMnode->rpcRef, 1);
6,829,594✔
76
#else
77
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
78
    mTrace("mnode rpc is acquired, ref:%d", ref);
79
#endif
80
  }
81
  (void)taosThreadRwlockUnlock(&pMnode->lock);
6,829,594✔
82
  TAOS_RETURN(code);
6,829,594✔
83
}
84

85
static inline void mndReleaseRpc(SMnode *pMnode) {
347,082,555✔
86
  (void)taosThreadRwlockRdlock(&pMnode->lock);
347,082,555✔
87
#if 1
88
  (void)atomic_sub_fetch_32(&pMnode->rpcRef, 1);
347,083,873✔
89
#else
90
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
91
  mTrace("mnode rpc is released, ref:%d", ref);
92
#endif
93
  (void)taosThreadRwlockUnlock(&pMnode->lock);
347,082,964✔
94
}
347,083,584✔
95

96
static void *mndBuildTimerMsg(int32_t *pContLen) {
89,120,022✔
97
  terrno = 0;
89,120,022✔
98
  SMTimerReq timerReq = {0};
89,120,022✔
99

100
  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
89,120,022✔
101
  if (contLen <= 0) return NULL;
89,120,022✔
102
  void *pReq = rpcMallocCont(contLen);
89,120,022✔
103
  if (pReq == NULL) return NULL;
89,120,022✔
104

105
  if (tSerializeSMTimerMsg(pReq, contLen, &timerReq) < 0) {
89,120,022✔
106
    mError("failed to serialize timer msg since %s", terrstr());
×
107
  }
108
  *pContLen = contLen;
89,120,022✔
109
  return pReq;
89,120,022✔
110
}
111

112
static void mndPullupTrans(SMnode *pMnode) {
17,999,183✔
113
  mTrace("pullup trans msg");
17,999,183✔
114
  int32_t contLen = 0;
17,999,183✔
115
  void   *pReq = mndBuildTimerMsg(&contLen);
17,999,183✔
116
  if (pReq != NULL) {
17,999,183✔
117
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
17,999,183✔
118
    // TODO check return value
119
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
17,999,183✔
120
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
121
    }
122
  }
123
}
17,999,183✔
124

125
static void mndPullupCompacts(SMnode *pMnode) {
3,600,180✔
126
  mTrace("pullup compact timer msg");
3,600,180✔
127
  int32_t contLen = 0;
3,600,180✔
128
  void   *pReq = mndBuildTimerMsg(&contLen);
3,600,180✔
129
  if (pReq != NULL) {
3,600,180✔
130
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen};
3,600,180✔
131
    // TODO check return value
132
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
3,600,180✔
133
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
134
    }
135
  }
136
}
3,600,180✔
137

138
static void mndPullupScans(SMnode *pMnode) {
3,600,180✔
139
  mTrace("pullup scan timer msg");
3,600,180✔
140
  int32_t contLen = 0;
3,600,180✔
141
  void   *pReq = mndBuildTimerMsg(&contLen);
3,600,180✔
142
  if (pReq != NULL) {
3,600,180✔
143
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_SCAN_TIMER, .pCont = pReq, .contLen = contLen};
3,600,180✔
144
    // TODO check return value
145
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
3,600,180✔
146
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
147
    }
148
  }
149
}
3,600,180✔
150

151
static void mndPullupInstances(SMnode *pMnode) {
7,186,750✔
152
  mTrace("pullup instance timer msg");
7,186,750✔
153
  int32_t contLen = 0;
7,186,750✔
154
  void   *pReq = mndBuildTimerMsg(&contLen);
7,186,750✔
155
  if (pReq != NULL) {
7,186,750✔
156
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_INSTANCE_TIMER, .pCont = pReq, .contLen = contLen};
7,186,750✔
157
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
7,186,750✔
158
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
159
    }
160
  }
161
}
7,186,750✔
162

163
static void mndPullupTtl(SMnode *pMnode) {
3,754,752✔
164
  mTrace("pullup ttl");
3,754,752✔
165
  int32_t contLen = 0;
3,754,752✔
166
  void   *pReq = mndBuildTimerMsg(&contLen);
3,754,752✔
167
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
3,754,752✔
168
  // TODO check return value
169
  if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
3,754,752✔
170
    mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
171
  }
172
}
3,754,752✔
173

174
static void mndPullupTrimDb(SMnode *pMnode) {
15,850✔
175
  mTrace("pullup trim");
15,850✔
176
  int32_t contLen = 0;
15,850✔
177
  void   *pReq = mndBuildTimerMsg(&contLen);
15,850✔
178
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen};
15,850✔
179
  // TODO check return value
180
  if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
15,850✔
181
    mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
182
  }
183
}
15,850✔
184

185
static void mndPullupQueryTrimDb(SMnode *pMnode) {
3,815,920✔
186
  mTrace("pullup trim query");
3,815,920✔
187
  int32_t contLen = 0;
3,815,920✔
188
  void   *pReq = mndBuildTimerMsg(&contLen);
3,815,920✔
189
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_QUERY_TRIM_TIMER, .pCont = pReq, .contLen = contLen};
3,815,920✔
190
  if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
3,815,920✔
191
    mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
192
  }
193
}
3,815,920✔
194

195
static void mndPullupSsMigrateDb(SMnode *pMnode) {
×
196
  if (grantCheck(TSDB_GRANT_SHARED_STORAGE) != TSDB_CODE_SUCCESS) {
×
197
    return;
×
198
  }
199

200
  mTrace("pullup ssmigrate db");
×
201
  int32_t contLen = 0;
×
202
  void   *pReq = mndBuildTimerMsg(&contLen);
×
203
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_SSMIGRATE_DB_TIMER, .pCont = pReq, .contLen = contLen};
×
204
  if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
×
205
    mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
206
  }
207
}
208

209
static void mndPullupUpdateSsMigrateProgress(SMnode *pMnode) {
178,710✔
210
  mTrace("pullup update ssmigrate progress");
178,710✔
211
  int32_t contLen = 0;
178,710✔
212
  void   *pReq = mndBuildTimerMsg(&contLen);
178,710✔
213
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_SSMIGRATE_PROGRESS_TIMER, .pCont = pReq, .contLen = contLen};
178,710✔
214
  if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
178,710✔
215
    mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
216
  }
217
}
178,710✔
218

219
static int32_t mndPullupArbHeartbeat(SMnode *pMnode) {
17,414,649✔
220
  mTrace("pullup arb hb");
17,414,649✔
221
  int32_t contLen = 0;
17,414,649✔
222
  void   *pReq = mndBuildTimerMsg(&contLen);
17,414,649✔
223
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_HEARTBEAT_TIMER, .pCont = pReq, .contLen = contLen, .info.noResp = 1};
17,414,649✔
224
  return tmsgPutToQueue(&pMnode->msgCb, ARB_QUEUE, &rpcMsg);
17,414,649✔
225
}
226

227
static int32_t mndPullupArbCheckSync(SMnode *pMnode) {
11,536,451✔
228
  mTrace("pullup arb sync");
11,536,451✔
229
  int32_t contLen = 0;
11,536,451✔
230
  void   *pReq = mndBuildTimerMsg(&contLen);
11,536,451✔
231
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_CHECK_SYNC_TIMER, .pCont = pReq, .contLen = contLen, .info.noResp = 1};
11,536,451✔
232
  return tmsgPutToQueue(&pMnode->msgCb, ARB_QUEUE, &rpcMsg);
11,536,451✔
233
}
234

235
static void mndCalMqRebalance(SMnode *pMnode) {
17,999,010✔
236
  int32_t contLen = 0;
17,999,010✔
237
  void   *pReq = mndBuildTimerMsg(&contLen);
17,999,010✔
238
  if (pReq != NULL) {
17,999,010✔
239
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
17,999,010✔
240
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
17,999,010✔
241
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
242
    }
243
  }
244
}
17,999,010✔
245

246
static void mndPullupTelem(SMnode *pMnode) {
222✔
247
  mTrace("pullup telem msg");
222✔
248
  int32_t contLen = 0;
222✔
249
  void   *pReq = mndBuildTimerMsg(&contLen);
222✔
250
  if (pReq != NULL) {
222✔
251
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
222✔
252
    // TODO check return value
253
    if (tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg) < 0) {
222✔
254
      mError("failed to put into read-queue since %s, line:%d", terrstr(), __LINE__);
×
255
    }
256
  }
257
}
222✔
258

259
static void mndPullupGrant(SMnode *pMnode) {
1,898,342✔
260
  mTrace("pullup grant msg");
1,898,342✔
261
  int32_t contLen = 0;
1,898,342✔
262
  void   *pReq = mndBuildTimerMsg(&contLen);
1,898,342✔
263
  if (pReq != NULL) {
1,898,342✔
264
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_GRANT_HB_TIMER,
1,898,342✔
265
                      .pCont = pReq,
266
                      .contLen = contLen,
267
                      .info.notFreeAhandle = 1,
268
                      .info.ahandle = 0};
269
    // TODO check return value
270
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
1,898,342✔
271
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
272
    }
273
  }
274
}
1,898,342✔
275

276
static void mndPullupAuth(SMnode *pMnode) {
×
277
  mTrace("pullup auth msg");
×
278
  int32_t contLen = 0;
×
279
  void   *pReq = mndBuildTimerMsg(&contLen);
×
280
  if (pReq != NULL) {
×
281
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_AUTH_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.notFreeAhandle = 1, .info.ahandle = 0};
×
282
    // TODO check return value
283
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
×
284
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
285
    }
286
  }
287
}
×
288

289
static void mndIncreaseUpTime(SMnode *pMnode) {
119,823✔
290
  mTrace("increate uptime");
119,823✔
291
  int32_t contLen = 0;
119,823✔
292
  void   *pReq = mndBuildTimerMsg(&contLen);
119,823✔
293
  if (pReq != NULL) {
119,823✔
294
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPTIME_TIMER,
119,823✔
295
                      .pCont = pReq,
296
                      .contLen = contLen,
297
                      .info.notFreeAhandle = 1,
298
                      .info.ahandle = 0};
299
    // TODO check return value
300
    if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
119,823✔
301
      mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
×
302
    }
303
  }
304
}
119,823✔
305

306
static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) {
143,490✔
307
  SSdb *pSdb = pMnode->pSdb;
143,490✔
308

309
  void *pIter = NULL;
143,490✔
310
  while (1) {
444,675✔
311
    SVgObj *pVgroup = NULL;
588,165✔
312
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
588,165✔
313
    if (pIter == NULL) break;
588,165✔
314

315
    bool stateChanged = false;
444,675✔
316
    for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
1,138,175✔
317
      SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
875,044✔
318
      if (pGid->dnodeId == dnodeId) {
875,044✔
319
        if (pGid->syncState != TAOS_SYNC_STATE_OFFLINE) {
181,544✔
320
          mInfo(
78,595✔
321
              "vgId:%d, state changed by offline check, old state:%s restored:%d canRead:%d new state:offline "
322
              "restored:0 "
323
              "canRead:0",
324
              pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead);
325
          pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
78,595✔
326
          pGid->syncRestore = 0;
78,595✔
327
          pGid->syncCanRead = 0;
78,595✔
328
          pGid->startTimeMs = 0;
78,595✔
329
          pGid->learnerProgress = 0;
78,595✔
330
          pGid->snapSeq = -1;
78,595✔
331
          stateChanged = true;
78,595✔
332
        }
333
        break;
181,544✔
334
      }
335
    }
336

337
    if (stateChanged) {
444,675✔
338
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
78,595✔
339
      if (pDb != NULL && pDb->stateTs != curMs) {
78,595✔
340
        mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
37,198✔
341
              curMs);
342
        pDb->stateTs = curMs;
37,198✔
343
      }
344
      mndReleaseDb(pMnode, pDb);
78,595✔
345
    }
346

347
    sdbRelease(pSdb, pVgroup);
444,675✔
348
  }
349
}
143,490✔
350

351
static void mndCheckDnodeOffline(SMnode *pMnode) {
6,829,514✔
352
  mTrace("check dnode offline");
6,829,514✔
353
  if (mndAcquireRpc(pMnode) != 0) return;
6,829,514✔
354

355
  SSdb   *pSdb = pMnode->pSdb;
6,829,514✔
356
  int64_t curMs = taosGetTimestampMs();
6,829,514✔
357

358
  void *pIter = NULL;
6,829,514✔
359
  while (1) {
11,347,618✔
360
    SDnodeObj *pDnode = NULL;
18,177,132✔
361
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
18,177,132✔
362
    if (pIter == NULL) break;
18,177,132✔
363

364
    bool online = mndIsDnodeOnline(pDnode, curMs);
11,347,618✔
365
    if (!online) {
11,347,618✔
366
      mInfo("dnode:%d, in offline state", pDnode->id);
143,490✔
367
      mndSetVgroupOffline(pMnode, pDnode->id, curMs);
143,490✔
368
    }
369

370
    sdbRelease(pSdb, pDnode);
11,347,618✔
371
  }
372

373
  mndReleaseRpc(pMnode);
6,829,514✔
374
}
375

376
static bool mnodeIsNotLeader(SMnode *pMnode) {
402,424,880✔
377
  terrno = 0;
402,424,880✔
378
  (void)taosThreadRwlockRdlock(&pMnode->lock);
402,424,880✔
379
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
402,425,560✔
380
  if (terrno != 0) {
402,425,560✔
381
    (void)taosThreadRwlockUnlock(&pMnode->lock);
×
382
    return true;
×
383
  }
384

385
  if (state.state != TAOS_SYNC_STATE_LEADER) {
402,425,061✔
386
    (void)taosThreadRwlockUnlock(&pMnode->lock);
13,838,450✔
387
    terrno = TSDB_CODE_SYN_NOT_LEADER;
13,838,450✔
388
    return true;
13,838,450✔
389
  }
390
  if (!state.restored || !pMnode->restored) {
388,586,611✔
391
    (void)taosThreadRwlockUnlock(&pMnode->lock);
8,780✔
392
    terrno = TSDB_CODE_SYN_RESTORING;
7,802✔
393
    return true;
7,802✔
394
  }
395
  (void)taosThreadRwlockUnlock(&pMnode->lock);
388,577,831✔
396
  return false;
388,578,554✔
397
}
398

399
static int32_t minCronTime() {
×
400
  int32_t min = INT32_MAX;
×
401
  min = TMIN(min, tsTtlPushIntervalSec);
×
402
  min = TMIN(min, tsTrimVDbIntervalSec);
×
403
  min = TMIN(min, tsSsAutoMigrateIntervalSec);
×
404
  min = TMIN(min, tsTransPullupInterval);
×
405
  min = TMIN(min, tsCompactPullupInterval);
×
406
  min = TMIN(min, tsMqRebalanceInterval);
×
407

408
  int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
×
409
  min = TMIN(min, telemInt);
×
410
  min = TMIN(min, tsGrantHBInterval);
×
411
  min = TMIN(min, tsUptimeInterval);
×
412

413
  return min <= 1 ? 2 : min;
×
414
}
415
void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
35,972,948✔
416
  int32_t code = 0;
35,972,948✔
417
#ifndef TD_ASTRA
418
  if (sec % tsGrantHBInterval == 0) {  // put in the 1st place as to take effect ASAP
35,972,948✔
419
    mndPullupGrant(pMnode);
1,898,342✔
420
  }
421
  if (sec % tsTtlPushIntervalSec == 0) {
35,972,948✔
422
    mndPullupTtl(pMnode);
3,754,752✔
423
  }
424

425
  if (sec % tsTrimVDbIntervalSec == 0) {
35,972,948✔
426
    mndPullupTrimDb(pMnode);
15,850✔
427
  }
428

429
  if (sec % tsQueryTrimIntervalSec == 0) {
35,972,948✔
430
    mndPullupQueryTrimDb(pMnode);
3,815,920✔
431
  }
432
#endif
433
#ifdef USE_SHARED_STORAGE
434
  if (tsSsEnabled) {
35,972,948✔
435
    if (sec % tsQuerySsMigrateIntervalSec == 0) {
243,110✔
436
      mndPullupUpdateSsMigrateProgress(pMnode);
178,710✔
437
    }
438
    if (tsSsEnabled == 2) {
243,110✔
439
      // By default, both tsTrimVDbIntervalSec and tsSsAutoMigrateIntervalSec are 3600 seconds,
440
      // so, delay half interval to do ss migrate to avoid conflict.
441
      //
442
      // NOTE: this solution is not perfect, there could still be conflict if user changes the
443
      // default value, but it is good enough as user is unlikely to change the default value.
444
      // The best solution is adding a new offset config to all cron tasks, but that would add
445
      // extra complexity.
446
      if ((sec % tsSsAutoMigrateIntervalSec) == (tsSsAutoMigrateIntervalSec / 2)) {
×
447
        mndPullupSsMigrateDb(pMnode);
×
448
      }
449
    }
450
  }
451
#endif
452
#ifdef TD_ENTERPRISE
453
  if (tsAuthReq) {
35,972,948✔
454
    if (sec % tsAuthReqHBInterval == 0) {
×
455
      mndPullupAuth(pMnode);
×
456
    }
457
  }
458
#endif
459
  if (sec % tsTransPullupInterval == 0) {
35,972,948✔
460
    mndPullupTrans(pMnode);
17,999,183✔
461
  }
462

463
  if (sec % tsCompactPullupInterval == 0) {
35,972,948✔
464
    mndPullupCompacts(pMnode);
3,600,180✔
465
  }
466

467
  if (sec % tsScanPullupInterval == 0) {
35,972,948✔
468
    mndPullupScans(pMnode);
3,600,180✔
469
  }
470
  if (tsInstancePullupInterval > 0 && sec % tsInstancePullupInterval == 0) {  // check instance expired
35,972,948✔
471
    mndPullupInstances(pMnode);
7,186,750✔
472
  }
473
#ifdef USE_TOPIC
474
  if (sec % tsMqRebalanceInterval == 0) {
35,972,948✔
475
    mndCalMqRebalance(pMnode);
17,999,010✔
476
  }
477
#endif
478
  if (tsTelemInterval > 0 && sec % tsTelemInterval == 0) {
35,972,948✔
479
    mndPullupTelem(pMnode);
222✔
480
  }
481
  if (sec % tsUptimeInterval == 0) {
35,972,948✔
482
    mndIncreaseUpTime(pMnode);
119,823✔
483
  }
484
}
35,972,948✔
485

486
void mndDoArbTimerPullupTask(SMnode *pMnode, int64_t ms) {
352,606,360✔
487
  int32_t code = 0;
352,606,360✔
488
#ifndef TD_ASTRA
489
  if (ms % (tsArbHeartBeatIntervalMs) == 0) {
352,606,360✔
490
    if ((code = mndPullupArbHeartbeat(pMnode)) != 0) {
17,414,649✔
491
      mError("failed to pullup arb heartbeat, since:%s", tstrerror(code));
×
492
    }
493
  }
494

495
  if (ms % (tsArbCheckSyncIntervalMs) == 0) {
352,606,360✔
496
    if ((code = mndPullupArbCheckSync(pMnode)) != 0) {
11,536,451✔
497
      mError("failed to pullup arb check sync, since:%s", tstrerror(code));
×
498
    }
499
  }
500
#endif
501
}
352,606,360✔
502

503
void mndDoTimerCheckStatus(SMnode *pMnode, int64_t ms) {
352,606,360✔
504
  if (ms % (tsStatusTimeoutMs) == 0) {
352,606,360✔
505
    mndCheckDnodeOffline(pMnode);
6,829,514✔
506
  }
507
}
352,606,360✔
508

509
void mndDoTimerCheckSync(SMnode *pMnode, int64_t sec) {
35,972,948✔
510
  if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) {
35,972,948✔
511
    mndSyncCheckTimeout(pMnode);
1,193,563✔
512
  }
513
  if (!tsDisableStream && (sec % MND_STREAM_HEALTH_CHECK_PERIOD_SEC == 0)) {
35,972,948✔
514
    msmHealthCheck(pMnode);
11,976,402✔
515
  }
516
}
35,972,948✔
517

518
static void *mndThreadSecFp(void *param) {
514,127✔
519
  SMnode *pMnode = param;
514,127✔
520
  int64_t lastSec = 0;
514,127✔
521
  setThreadName("mnode-timer");
514,127✔
522
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
514,127✔
523

524
  while (1) {
365,741,063✔
525
    if (mndGetStop(pMnode)) break;
366,255,190✔
526

527
    int64_t nowSec = taosGetTimestampMs() / 1000;
365,741,063✔
528
    if (nowSec == lastSec) {
365,741,063✔
529
      taosMsleep(100);
328,454,234✔
530
      continue;
328,454,234✔
531
    }
532
    lastSec = nowSec;
37,286,829✔
533

534
    if (mnodeIsNotLeader(pMnode)) {
37,286,829✔
535
      taosMsleep(100);
1,313,881✔
536
      mTrace("timer not process since mnode is not leader");
1,313,881✔
537
      continue;
1,313,881✔
538
    }
539

540
    mndDoTimerCheckSync(pMnode, nowSec);
35,972,948✔
541

542
    mndDoTimerPullupTask(pMnode, nowSec);
35,972,948✔
543

544
    taosMsleep(100);
35,972,948✔
545
  }
546

547
  return NULL;
514,127✔
548
}
549

550
static void *mndThreadMsFp(void *param) {
514,127✔
551
  SMnode *pMnode = param;
514,127✔
552
  int64_t lastTime = 0;
514,127✔
553
  setThreadName("mnode-arb-timer");
514,127✔
554
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
514,127✔
555

556
  while (1) {
557
    lastTime += 100;
365,652,858✔
558
    taosMsleep(100);
365,652,858✔
559

560
    if (mndGetStop(pMnode)) break;
365,652,858✔
561
    if (lastTime % 10 != 0) continue;
365,138,731✔
562

563
    if (mnodeIsNotLeader(pMnode)) {
365,138,731✔
564
      mTrace("timer not process since mnode is not leader");
12,532,371✔
565
      continue;
12,532,371✔
566
    }
567

568
    mndDoTimerCheckStatus(pMnode, lastTime);
352,606,360✔
569

570
    mndDoArbTimerPullupTask(pMnode, lastTime);
352,606,360✔
571
  }
572

573
  return NULL;
514,127✔
574
}
575

576
static int32_t mndInitTimer(SMnode *pMnode) {
514,127✔
577
  int32_t      code = 0;
514,127✔
578
  TdThreadAttr thAttr;
512,369✔
579
  (void)taosThreadAttrInit(&thAttr);
514,127✔
580
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
514,127✔
581
#ifdef TD_COMPACT_OS
582
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
583
#endif
584
  if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadSecFp, pMnode)) != 0) {
514,127✔
585
    mError("failed to create timer thread since %s", tstrerror(code));
×
586
    TAOS_RETURN(code);
×
587
  }
588

589
  (void)taosThreadAttrDestroy(&thAttr);
514,127✔
590
  tmsgReportStartup("mnode-timer", "initialized");
514,127✔
591

592
  TdThreadAttr arbAttr;
512,369✔
593
  (void)taosThreadAttrInit(&arbAttr);
514,127✔
594
  (void)taosThreadAttrSetDetachState(&arbAttr, PTHREAD_CREATE_JOINABLE);
514,127✔
595
#ifdef TD_COMPACT_OS
596
  (void)taosThreadAttrSetStackSize(&arbAttr, STACK_SIZE_SMALL);
597
#endif
598
  if ((code = taosThreadCreate(&pMnode->arbThread, &arbAttr, mndThreadMsFp, pMnode)) != 0) {
514,127✔
599
    mError("failed to create arb timer thread since %s", tstrerror(code));
×
600
    TAOS_RETURN(code);
×
601
  }
602

603
  (void)taosThreadAttrDestroy(&arbAttr);
514,127✔
604
  tmsgReportStartup("mnode-timer", "initialized");
514,127✔
605
  TAOS_RETURN(code);
514,127✔
606
}
607

608
static void mndCleanupTimer(SMnode *pMnode) {
514,127✔
609
  if (taosCheckPthreadValid(pMnode->thread)) {
514,127✔
610
    (void)taosThreadJoin(pMnode->thread, NULL);
514,127✔
611
    taosThreadClear(&pMnode->thread);
514,127✔
612
  }
613
  if (taosCheckPthreadValid(pMnode->arbThread)) {
514,127✔
614
    (void)taosThreadJoin(pMnode->arbThread, NULL);
514,127✔
615
    taosThreadClear(&pMnode->arbThread);
514,127✔
616
  }
617
}
514,127✔
618

619
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
514,282✔
620
  int32_t code = 0;
514,282✔
621
  pMnode->path = taosStrdup(path);
514,282✔
622
  if (pMnode->path == NULL) {
514,282✔
623
    code = terrno;
×
624
    TAOS_RETURN(code);
×
625
  }
626

627
  if (taosMkDir(pMnode->path) != 0) {
514,282✔
628
    code = terrno;
×
629
    TAOS_RETURN(code);
×
630
  }
631

632
  TAOS_RETURN(code);
514,282✔
633
}
634

635
static int32_t mndInitWal(SMnode *pMnode) {
514,282✔
636
  int32_t code = 0;
514,282✔
637
  char    path[PATH_MAX + 20] = {0};
514,282✔
638
  (void)snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
514,282✔
639
  SWalCfg cfg = {.vgId = 1,
514,282✔
640
                 .fsyncPeriod = 0,
641
                 .rollPeriod = -1,
642
                 .segSize = -1,
643
                 .committed = -1,
644
                 .retentionPeriod = 0,
645
                 .retentionSize = 0,
646
                 .level = TAOS_WAL_FSYNC,
647
                 .encryptAlgr = 0,
648
                 .encryptData = {0}};
649

650
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
651
  if (taosWaitCfgKeyLoaded() != 0) {
514,282✔
652
    code = terrno;
×
653
    TAOS_RETURN(code);
×
654
  }
655
  if (tsMetaKey[0] != '\0') {
514,282✔
656
    tstrncpy(cfg.encryptData.encryptKey, tsMetaKey, ENCRYPT_KEY_LEN + 1);
5,651✔
657
  }
658
#endif
659

660
  pMnode->pWal = walOpen(path, &cfg);
514,282✔
661
  if (pMnode->pWal == NULL) {
514,282✔
662
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
663
    if (terrno != 0) code = terrno;
×
664
    mError("failed to open wal since %s. wal:%s", tstrerror(code), path);
×
665
    TAOS_RETURN(code);
×
666
  }
667

668
  TAOS_RETURN(code);
514,282✔
669
}
670

671
static void mndCloseWal(SMnode *pMnode) {
514,214✔
672
  if (pMnode->pWal != NULL) {
514,214✔
673
    walClose(pMnode->pWal);
514,214✔
674
    pMnode->pWal = NULL;
514,214✔
675
  }
676
}
514,214✔
677

678
// Forward declarations for mmFile.c functions
679
extern int32_t mmReadFile(const char *path, SMnodeOpt *pOption);
680
extern int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption);
681

682
// Callback function to persist encrypted flag to mnode.json
683
static int32_t mndPersistEncryptedFlag(void *param) {
7,993✔
684
  SMnode *pMnode = (SMnode *)param;
7,993✔
685
  if (pMnode == NULL) {
7,993✔
686
    return TSDB_CODE_INVALID_PARA;
×
687
  }
688
  
689
  mInfo("persisting encrypted flag to mnode.json");
7,993✔
690
  
691
  SMnodeOpt option = {0};
7,993✔
692
  int32_t code = mmReadFile(pMnode->path, &option);
7,993✔
693
  if (code != 0) {
7,993✔
694
    mError("failed to read mnode.json for persisting encrypted flag since %s", tstrerror(code));
×
695
    return code;
×
696
  }
697
  
698
  option.encrypted = true;
7,993✔
699
  code = mmWriteFile(pMnode->path, &option);
7,993✔
700
  if (code != 0) {
7,993✔
701
    mError("failed to write mnode.json for persisting encrypted flag since %s", tstrerror(code));
×
702
    return code;
×
703
  }
704
  
705
  // Also update mnode's encrypted flag
706
  pMnode->encrypted = true;
7,993✔
707
  
708
  mInfo("successfully persisted encrypted flag to mnode.json");
7,993✔
709
  return 0;
7,993✔
710
}
711

712
static int32_t mndInitSdb(SMnode *pMnode) {
514,282✔
713
  int32_t code = 0;
514,282✔
714
  SSdbOpt opt = {0};
514,282✔
715
  opt.path = pMnode->path;
514,282✔
716
  opt.pMnode = pMnode;
514,282✔
717
  opt.pWal = pMnode->pWal;
514,282✔
718

719
  pMnode->pSdb = sdbInit(&opt);
514,282✔
720
  if (pMnode->pSdb == NULL) {
514,282✔
721
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
722
    if (terrno != 0) code = terrno;
×
723
    TAOS_RETURN(code);
×
724
  }
725

726
  TAOS_RETURN(code);
514,282✔
727
}
728

729
static int32_t mndOpenSdb(SMnode *pMnode) {
514,282✔
730
  int32_t code = 0;
514,282✔
731
  
732
  pMnode->pSdb->encrypted = pMnode->encrypted;
514,282✔
733
  
734
  // Set callback for persisting encrypted flag
735
  pMnode->pSdb->persistEncryptedFlagFp = mndPersistEncryptedFlag;
514,282✔
736
  pMnode->pSdb->pMnodeForCallback = pMnode;
514,282✔
737

738
  if (!pMnode->deploy) {
514,282✔
739
    code = sdbReadFile(pMnode->pSdb);
142,913✔
740
  }
741

742
  mInfo("vgId:1, mnode sdb is opened, with applied index:%" PRId64, pMnode->pSdb->commitIndex);
514,282✔
743

744
  atomic_store_64(&pMnode->applied, pMnode->pSdb->commitIndex);
514,282✔
745
  return code;
514,282✔
746
}
747

748
static void mndCleanupSdb(SMnode *pMnode) {
514,214✔
749
  if (pMnode->pSdb) {
514,214✔
750
    sdbCleanup(pMnode->pSdb);
514,214✔
751
    pMnode->pSdb = NULL;
514,214✔
752
  }
753
}
514,214✔
754

755
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
26,228,382✔
756
  SMnodeStep step = {0};
26,228,382✔
757
  step.name = name;
26,228,382✔
758
  step.initFp = initFp;
26,228,382✔
759
  step.cleanupFp = cleanupFp;
26,228,382✔
760
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
52,456,764✔
761
    TAOS_RETURN(terrno);
×
762
  }
763

764
  TAOS_RETURN(0);
26,228,382✔
765
}
766

767
static int32_t mndInitSteps(SMnode *pMnode) {
514,282✔
768
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal));
514,282✔
769
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb));
514,282✔
770
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans));
514,282✔
771
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster));
514,282✔
772
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-security-policy", mndInitSecurityPolicy, mndCleanupSecurityPolicy));
514,282✔
773
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-encrypt-algorithms", mndInitEncryptAlgr, mndCleanupEncryptAlgr));
514,282✔
774
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode));
514,282✔
775
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode));
514,282✔
776
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode));
514,282✔
777
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-anode", mndInitAnode, mndCleanupAnode));
514,282✔
778
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-bnode", mndInitBnode, mndCleanupBnode));
514,282✔
779
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-xnode", mndInitXnode, mndCleanupXnode));
514,282✔
780
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-arbgroup", mndInitArbGroup, mndCleanupArbGroup));
514,282✔
781
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-config", mndInitConfig, NULL));
514,282✔
782
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode));
514,282✔
783
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-role", mndInitRole, mndCleanupRole));
514,282✔
784
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser));
514,282✔
785
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-token", mndInitToken, mndCleanupToken));
514,282✔
786
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant));
514,282✔
787
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege));
514,282✔
788
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct));
514,282✔
789
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream));
514,282✔
790
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-instance", mndInitInstance, mndCleanupInstance));
514,282✔
791
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic));
514,282✔
792
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer));
514,282✔
793
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe));
514,282✔
794
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup));
514,282✔
795
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb));
514,282✔
796
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma));
514,282✔
797
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-idx", mndInitIdx, mndCleanupIdx));
514,282✔
798
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos));
514,282✔
799
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs));
514,282✔
800
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb));
514,282✔
801
#ifdef USE_MOUNT
802
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-mount", mndInitMount, mndCleanupMount));
514,282✔
803
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-mount-log", mndInitMountLog, mndCleanupMountLog));
514,282✔
804
#endif
805
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-rsma", mndInitRsma, mndCleanupRsma));
514,282✔
806
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc));
514,282✔
807
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-view", mndInitView, mndCleanupView));
514,282✔
808
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-compact", mndInitCompact, mndCleanupCompact));
514,282✔
809
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-scan", mndInitScan, mndCleanupScan));
514,282✔
810
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-retention", mndInitRetention, mndCleanupRetention));
514,282✔
811
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-compact-detail", mndInitCompactDetail, mndCleanupCompactDetail));
514,282✔
812
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-scan-detail", mndInitScanDetail, mndCleanupScanDetail));
514,282✔
813
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-retention-detail", mndInitRetentionDetail, mndCleanupRetentionDetail));
514,282✔
814
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-ssmigrate", mndInitSsMigrate, mndCleanupSsMigrate));
514,282✔
815
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL));
514,282✔
816
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile));
514,282✔
817
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow));
514,282✔
818
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery));
514,282✔
819
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync));
514,282✔
820
  TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem));
514,282✔
821
  return 0;
514,282✔
822
}
823

824
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
514,214✔
825
  if (pMnode->pSteps == NULL) return;
514,214✔
826

827
  if (pos == -1) {
514,214✔
828
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
514,214✔
829
  }
830

831
  for (int32_t s = pos; s >= 0; s--) {
26,739,128✔
832
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
26,224,914✔
833
    mInfo("%s will cleanup", pStep->name);
26,224,914✔
834
    if (pStep->cleanupFp != NULL) {
26,224,914✔
835
      (*pStep->cleanupFp)(pMnode);
25,196,486✔
836
    }
837
  }
838

839
  taosArrayClear(pMnode->pSteps);
514,214✔
840
  taosArrayDestroy(pMnode->pSteps);
514,214✔
841
  pMnode->pSteps = NULL;
514,214✔
842
}
843

844
static int32_t mndExecSteps(SMnode *pMnode) {
514,282✔
845
  int32_t code = 0;
514,282✔
846
  int32_t size = taosArrayGetSize(pMnode->pSteps);
514,282✔
847
  for (int32_t pos = 0; pos < size; pos++) {
26,742,664✔
848
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
26,228,382✔
849
    if (pStep->initFp == NULL) continue;
26,228,382✔
850

851
    if ((code = (*pStep->initFp)(pMnode)) != 0) {
26,228,382✔
852
      mError("%s exec failed since %s, start to cleanup", pStep->name, tstrerror(code));
×
853
      mndCleanupSteps(pMnode, pos);
×
854
      TAOS_RETURN(code);
×
855
    } else {
856
      mInfo("%s is initialized", pStep->name);
26,228,382✔
857
      tmsgReportStartup(pStep->name, "initialized");
26,228,382✔
858
    }
859
  }
860

861
  pMnode->clusterId = mndGetClusterId(pMnode);
514,282✔
862
  TAOS_RETURN(0);
514,282✔
863
}
864

865
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
514,282✔
866
  pMnode->msgCb = pOption->msgCb;
514,282✔
867
  pMnode->selfDnodeId = pOption->dnodeId;
514,282✔
868
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
514,282✔
869
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
514,282✔
870
  pMnode->syncMgmt.numOfTotalReplicas = pOption->numOfTotalReplicas;
514,282✔
871
  pMnode->syncMgmt.lastIndex = pOption->lastIndex;
514,282✔
872
  (void)memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
514,282✔
873
  (void)memcpy(pMnode->syncMgmt.nodeRoles, pOption->nodeRoles, sizeof(pOption->nodeRoles));
514,282✔
874
  pMnode->encrypted = pOption->encrypted;
514,282✔
875
}
514,282✔
876

877
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
514,282✔
878
  terrno = 0;
514,282✔
879
  mInfo("start to open mnode in %s", path);
514,282✔
880

881
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
514,282✔
882
  if (pMnode == NULL) {
514,282✔
883
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
884
    mError("failed to open mnode in step 1, since %s", terrstr());
×
885
    return NULL;
×
886
  }
887
  (void)memset(pMnode, 0, sizeof(SMnode));
514,282✔
888

889
  int32_t code = taosThreadRwlockInit(&pMnode->lock, NULL);
514,282✔
890
  if (code != 0) {
514,282✔
891
    taosMemoryFree(pMnode);
×
892
    mError("failed to open mnode in step 2, add lock, since %s", tstrerror(code));
×
893
    terrno = code;
×
894
    return NULL;
×
895
  }
896

897
  mInfo("vgId:1, mnode set options to syncMgmt, dnodeId:%d, numOfTotalReplicas:%d", pOption->selfIndex,
514,282✔
898
        pOption->numOfTotalReplicas);
899
  mndSetOptions(pMnode, pOption);
514,282✔
900

901
  pMnode->deploy = pOption->deploy;
514,282✔
902
  pMnode->version = pOption->version;
514,282✔
903
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
514,282✔
904
  if (pMnode->pSteps == NULL) {
514,282✔
905
    taosMemoryFree(pMnode);
×
906
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
907
    mError("failed to open mnode in step 4, since %s", terrstr());
×
908
    return NULL;
×
909
  }
910

911
  code = mndCreateDir(pMnode, path);
514,282✔
912
  if (code != 0) {
514,282✔
913
    mError("failed to open mnode in step 5, since %s", tstrerror(code));
×
914
    mndClose(pMnode);
×
915
    terrno = code;
×
916
    return NULL;
×
917
  }
918

919
  code = mndInitSteps(pMnode);
514,282✔
920
  if (code != 0) {
514,282✔
921
    mError("failed to open mnode in step 6, since %s", tstrerror(code));
×
922
    mndClose(pMnode);
×
923
    terrno = code;
×
924
    return NULL;
×
925
  }
926

927
  code = mndExecSteps(pMnode);
514,282✔
928
  if (code != 0) {
514,282✔
929
    mError("failed to open mnode in step 7, since %s", tstrerror(code));
×
930
    mndClose(pMnode);
×
931
    terrno = code;
×
932
    return NULL;
×
933
  }
934

935
  mInfo("mnode open successfully");
514,282✔
936
  return pMnode;
514,282✔
937
}
938

939
void mndPreClose(SMnode *pMnode) {
514,127✔
940
  if (pMnode != NULL) {
514,127✔
941
    int32_t code = 0;
514,127✔
942
    // TODO check return value
943
    code = syncLeaderTransfer(pMnode->syncMgmt.sync);
514,127✔
944
    if (code < 0) {
514,127✔
945
      mError("failed to transfer leader since %s", tstrerror(code));
×
946
    }
947
    syncPreStop(pMnode->syncMgmt.sync);
514,127✔
948
    code = sdbWriteFile(pMnode->pSdb, 0);
514,127✔
949
    if (code < 0) {
514,127✔
950
      mError("failed to write sdb since %s", tstrerror(code));
792✔
951
    }
952
  }
953
}
514,127✔
954

955
void mndClose(SMnode *pMnode) {
514,214✔
956
  if (pMnode != NULL) {
514,214✔
957
    mInfo("start to close mnode");
514,214✔
958
    mndCleanupSteps(pMnode, -1);
514,214✔
959
    taosMemoryFreeClear(pMnode->path);
514,214✔
960
    taosMemoryFreeClear(pMnode);
514,214✔
961
    mInfo("mnode is closed");
514,214✔
962
  }
963
}
514,214✔
964

965
int32_t mndStart(SMnode *pMnode) {
514,127✔
966
  int32_t code = 0;
514,127✔
967
  mndSyncStart(pMnode);
514,127✔
968
  if (pMnode->deploy) {
514,127✔
969
    if (sdbDeploy(pMnode->pSdb) != 0) {
371,369✔
970
      mError("failed to deploy sdb while start mnode");
×
971
      return -1;
×
972
    }
973
    mndSetRestored(pMnode, true);
371,369✔
974
  }
975
  if (mndIsLeader(pMnode)) {
514,127✔
976
    if (sdbUpgrade(pMnode->pSdb, pMnode->version) != 0) {
464,613✔
977
      mError("failed to upgrade sdb while start mnode");
×
978
      return -1;
×
979
    }
980
#ifdef TD_ENTERPRISE
981
    if (tsSodEnforceMode) {
464,613✔
982
      if ((code = mndProcessEnforceSod(pMnode)) != 0) {
×
983
        if (code == TSDB_CODE_MND_ROLE_NO_VALID_SYSDBA || code == TSDB_CODE_MND_ROLE_NO_VALID_SYSSEC ||
×
984
            code == TSDB_CODE_MND_ROLE_NO_VALID_SYSAUDIT) {
985
          mInfo("enter SoD pending mode. Enforce SoD by command line failed since %s", tstrerror(code));
×
986
        } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
987
          mInfo("enter SoD pending mode. Enforce SoD is in progress");
×
988
        } else {
989
          mError("failed to enforce SoD by command line since %s", tstrerror(code));
×
990
          TAOS_RETURN(code);
×
991
        }
992
      } else {
993
        mndSetSoDPhase(pMnode, TSDB_SOD_PHASE_STABLE);
×
994
      }
995
    }
996
#endif
997
  }
998
  pMnode->version = TSDB_MNODE_BUILTIN_DATA_VERSION;
514,127✔
999
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
514,127✔
1000

1001
  return mndInitTimer(pMnode);
514,127✔
1002
}
1003

1004
bool mndNeedUpgrade(SMnode *pMnode, int32_t version) { return pMnode->version > version; }
514,127✔
1005

1006
int32_t mndGetVersion(SMnode *pMnode) { return pMnode->version; }
406,429✔
1007

1008
int32_t mndGetEncryptedFlag(SMnode *pMnode) { return pMnode->encrypted; }
406,429✔
1009

1010
int32_t mndIsCatchUp(SMnode *pMnode) {
206,852✔
1011
  int64_t rid = pMnode->syncMgmt.sync;
206,852✔
1012
  return syncIsCatchUp(rid);
206,852✔
1013
}
1014

1015
ESyncRole mndGetRole(SMnode *pMnode) {
206,852✔
1016
  int64_t rid = pMnode->syncMgmt.sync;
206,852✔
1017
  return syncGetRole(rid);
206,852✔
1018
}
1019

1020
int64_t mndGetTerm(SMnode *pMnode) {
11,668,103✔
1021
  int64_t rid = pMnode->syncMgmt.sync;
11,668,103✔
1022
  return syncGetTerm(rid);
11,668,103✔
1023
}
1024

1025
int32_t mndGetArbToken(SMnode *pMnode, char *outToken) { return syncGetArbToken(pMnode->syncMgmt.sync, outToken); }
29,082,028✔
1026

1027
void mndStop(SMnode *pMnode) {
514,127✔
1028
  mndSetStop(pMnode);
514,127✔
1029
  mndSyncStop(pMnode);
514,127✔
1030
  mndCleanupTimer(pMnode);
514,127✔
1031
}
514,127✔
1032

1033
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
68,918,068✔
1034
  SMnode    *pMnode = pMsg->info.node;
68,918,068✔
1035
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
68,918,068✔
1036

1037
  const STraceId *trace = &pMsg->info.traceId;
68,918,068✔
1038
  mGTrace("vgId:1, process sync msg:%p, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
68,917,956✔
1039

1040
  int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
68,917,956✔
1041
  if (code != 0) {
68,918,068✔
1042
    mGError("vgId:1, failed to process sync msg:%p type:%s since %s, code:0x%x", pMsg, TMSG_INFO(pMsg->msgType),
874✔
1043
            tstrerror(code), code);
1044
  }
1045

1046
  return code;
68,917,747✔
1047
}
1048

1049
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
343,356,813✔
1050
  int32_t code = 0;
343,356,813✔
1051
  if (!IsReq(pMsg)) TAOS_RETURN(code);
343,356,813✔
1052
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
302,103,730✔
1053
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
295,581,700✔
1054
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK ||
290,025,543✔
1055
      pMsg->msgType == TDMT_SCH_TASK_NOTIFY) {
277,080,286✔
1056
    TAOS_RETURN(code);
25,020,342✔
1057
  }
1058

1059
  SMnode *pMnode = pMsg->info.node;
277,080,987✔
1060
  (void)taosThreadRwlockRdlock(&pMnode->lock);
277,076,614✔
1061
  if (pMnode->stopped) {
277,086,049✔
1062
    (void)taosThreadRwlockUnlock(&pMnode->lock);
4,189✔
1063
    code = TSDB_CODE_APP_IS_STOPPING;
4,189✔
1064
    TAOS_RETURN(code);
4,189✔
1065
  }
1066

1067
  terrno = 0;
277,069,430✔
1068
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
277,072,325✔
1069
  if (terrno != 0) {
277,082,885✔
1070
    (void)taosThreadRwlockUnlock(&pMnode->lock);
×
1071
    code = terrno;
×
1072
    TAOS_RETURN(code);
×
1073
  }
1074

1075
  if (state.state != TAOS_SYNC_STATE_LEADER) {
277,081,651✔
1076
    (void)taosThreadRwlockUnlock(&pMnode->lock);
2,155,728✔
1077
    code = TSDB_CODE_SYN_NOT_LEADER;
2,155,728✔
1078
    goto _OVER;
2,155,728✔
1079
  }
1080

1081
  if (!state.restored || !pMnode->restored) {
274,925,923✔
1082
    (void)taosThreadRwlockUnlock(&pMnode->lock);
959,497✔
1083
    code = TSDB_CODE_SYN_RESTORING;
955,983✔
1084
    goto _OVER;
955,983✔
1085
  }
1086

1087
#if 1
1088
  (void)atomic_add_fetch_32(&pMnode->rpcRef, 1);
273,966,426✔
1089
#else
1090
  int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
1091
  mTrace("mnode rpc is acquired, ref:%d", ref);
1092
#endif
1093

1094
  (void)taosThreadRwlockUnlock(&pMnode->lock);
273,969,361✔
1095
  TAOS_RETURN(code);
273,967,680✔
1096

1097
_OVER:
3,111,711✔
1098
  if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
3,111,711✔
1099
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
3,111,499✔
1100
      pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER ||
3,111,521✔
1101
      pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER ||
3,111,404✔
1102
      pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT ||
3,110,717✔
1103
      pMsg->msgType == TDMT_MND_SSMIGRATE_DB_TIMER || pMsg->msgType == TDMT_MND_ARB_HEARTBEAT_TIMER ||
3,110,609✔
1104
      pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER || pMsg->msgType == TDMT_MND_CHECK_STREAM_TIMER ||
3,111,296✔
1105
      pMsg->msgType == TDMT_MND_UPDATE_SSMIGRATE_PROGRESS_TIMER || pMsg->msgType == TDMT_MND_SCAN_TIMER ||
3,110,717✔
1106
      pMsg->msgType == TDMT_MND_QUERY_TRIM_TIMER || pMsg->msgType == TDMT_MND_AUTH_HB_TIMER) {
3,110,609✔
1107
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
213✔
1108
           pMnode->stopped, state.restored, syncStr(state.state));
1109
    TAOS_RETURN(code);
213✔
1110
  }
1111

1112
  const STraceId *trace = &pMsg->info.traceId;
3,111,296✔
1113
  SEpSet          epSet = {0};
3,110,623✔
1114
  mndGetMnodeEpSet(pMnode, &epSet);
3,110,387✔
1115

1116
  mGDebug(
3,111,521✔
1117
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
1118
      "role:%s, redirect numOfEps:%d inUse:%d, type:%s",
1119
      pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code), pMnode->restored, pMnode->stopped, state.restored,
1120
      syncStr(state.state), epSet.numOfEps, epSet.inUse, TMSG_INFO(pMsg->msgType));
1121

1122
  if (epSet.numOfEps <= 0) return -1;
3,111,521✔
1123

1124
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
10,302,152✔
1125
    mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
7,190,518✔
1126
  }
1127

1128
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
3,111,634✔
1129
  pMsg->info.rsp = rpcMallocCont(contLen);
3,109,385✔
1130
  if (pMsg->info.rsp != NULL) {
3,111,426✔
1131
    if (tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet) < 0) {
3,111,521✔
1132
      mError("failed to serialize ep set");
×
1133
    }
1134
    pMsg->info.hasEpSet = 1;
3,111,048✔
1135
    pMsg->info.rspLen = contLen;
3,111,040✔
1136
  }
1137

1138
  TAOS_RETURN(code);
3,110,397✔
1139
}
1140

1141
int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo *pQueueInfo) {
343,371,626✔
1142
  SMnode         *pMnode = pMsg->info.node;
343,371,626✔
1143
  const STraceId *trace = &pMsg->info.traceId;
343,373,545✔
1144
  int32_t         code = TSDB_CODE_SUCCESS;
343,373,655✔
1145

1146
#ifdef TD_ENTERPRISE
1147
  if (pMsg->msgType != TDMT_MND_HEARTBEAT && pMsg->info.conn.isToken) {
343,373,655✔
1148
    SCachedTokenInfo ti = {0};
19,560✔
1149
    if (mndGetCachedTokenInfo(pMsg->info.conn.identifier, &ti) == NULL) {
19,560✔
1150
      mGError("msg:%p, failed to get token info, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
1,584✔
1151
      code = TSDB_CODE_MND_TOKEN_NOT_EXIST;
1,584✔
1152
      TAOS_RETURN(code);
1,584✔
1153
    }
1154
    if (ti.enabled == 0) {
17,976✔
1155
      mGError("msg:%p, token is disabled, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
3,168✔
1156
      code = TSDB_CODE_MND_TOKEN_DISABLED;
3,168✔
1157
      TAOS_RETURN(code);
3,168✔
1158
    }
1159
    if (ti.expireTime > 0 && taosGetTimestampSec() > (ti.expireTime + TSDB_TOKEN_EXPIRY_LEEWAY)) {
14,808✔
1160
      mGError("msg:%p, token is expired, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
×
1161
      code = TSDB_CODE_MND_TOKEN_EXPIRED;
×
1162
      TAOS_RETURN(code);
×
1163
    }
1164
    tstrncpy(pMsg->info.conn.user, ti.user, sizeof(pMsg->info.conn.user));
14,808✔
1165
  }
1166
#endif
1167

1168
  MndMsgFp    fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
343,359,577✔
1169
  MndMsgFpExt fpExt = NULL;
343,366,846✔
1170
  if (fp == NULL) {
343,366,846✔
1171
    fpExt = pMnode->msgFpExt[TMSG_INDEX(pMsg->msgType)];
25,030,867✔
1172
    if (fpExt == NULL) {
25,030,830✔
1173
      mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
×
1174
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
1175
      TAOS_RETURN(code);
×
1176
    }
1177
  }
1178

1179
  TAOS_CHECK_RETURN(mndCheckMnodeState(pMsg));
343,366,809✔
1180

1181
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
340,248,537✔
1182
  if (fp)
340,248,846✔
1183
    code = (*fp)(pMsg);
315,218,016✔
1184
  else
1185
    code = (*fpExt)(pMsg, pQueueInfo);
25,030,830✔
1186
  mndReleaseRpc(pMnode);
340,252,005✔
1187

1188
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
340,253,990✔
1189
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
38,832,284✔
1190
  } else if (code == 0) {
301,421,706✔
1191
    mGTrace("msg:%p, successfully processed", pMsg);
287,845,102✔
1192
  } else {
1193
    // TODO removve this wrong set code
1194
    if (code == -1) {
13,576,604✔
1195
      code = terrno;
×
1196
    }
1197
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, tstrerror(code), pMsg->info.ahandle,
13,576,604✔
1198
            TMSG_INFO(pMsg->msgType));
1199
  }
1200

1201
  TAOS_RETURN(code);
340,253,990✔
1202
}
1203

1204
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
123,941,962✔
1205
  tmsg_t type = TMSG_INDEX(msgType);
123,941,962✔
1206
  if (type < TDMT_MAX) {
123,941,962✔
1207
    pMnode->msgFp[type] = fp;
123,941,962✔
1208
  }
1209
}
123,941,962✔
1210

1211
void mndSetMsgHandleExt(SMnode *pMnode, tmsg_t msgType, MndMsgFpExt fp) {
4,114,256✔
1212
  tmsg_t type = TMSG_INDEX(msgType);
4,114,256✔
1213
  if (type < TDMT_MAX) {
4,114,256✔
1214
    pMnode->msgFpExt[type] = fp;
4,114,256✔
1215
  }
1216
}
4,114,256✔
1217

1218
// Note: uid 0 is reserved
1219
int64_t mndGenerateUid(const char *name, int32_t len) {
13,218,217✔
1220
  int32_t hashval = MurmurHash3_32(name, len);
13,218,217✔
1221
  do {
×
1222
    int64_t us = taosGetTimestampUs();
13,218,260✔
1223
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
13,218,260✔
1224
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
13,218,260✔
1225
    if (uuid) {
13,218,260✔
1226
      return llabs(uuid);
13,218,260✔
1227
    }
1228
  } while (true);
1229
}
1230

1231
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
80✔
1232
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
1233
  int32_t code = mndAcquireRpc(pMnode);
80✔
1234
  if (code < 0) {
80✔
1235
    TAOS_RETURN(code);
×
1236
  } else if (code == 1) {
80✔
1237
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1238
  }
1239

1240
  SSdb   *pSdb = pMnode->pSdb;
80✔
1241
  int64_t ms = taosGetTimestampMs();
80✔
1242

1243
  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
80✔
1244
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
80✔
1245
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
80✔
1246
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
80✔
1247
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
80✔
1248
      pStbInfo->stbs == NULL) {
80✔
1249
    mndReleaseRpc(pMnode);
×
1250
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1251
    if (terrno != 0) code = terrno;
×
1252
    TAOS_RETURN(code);
×
1253
  }
1254

1255
  // cluster info
1256
  tstrncpy(pClusterInfo->version, td_version, sizeof(pClusterInfo->version));
80✔
1257
  pClusterInfo->monitor_interval = tsMonitorInterval;
80✔
1258
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
80✔
1259
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
80✔
1260
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
80✔
1261
  pClusterInfo->topics_toal = sdbGetSize(pSdb, SDB_TOPIC);
80✔
1262
  pClusterInfo->streams_total = sdbGetSize(pSdb, SDB_STREAM);
80✔
1263

1264
  void *pIter = NULL;
80✔
1265
  while (1) {
80✔
1266
    SDnodeObj *pObj = NULL;
160✔
1267
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
160✔
1268
    if (pIter == NULL) break;
160✔
1269

1270
    SMonDnodeDesc desc = {0};
80✔
1271
    desc.dnode_id = pObj->id;
80✔
1272
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
80✔
1273
    if (mndIsDnodeOnline(pObj, ms)) {
80✔
1274
      tstrncpy(desc.status, "ready", sizeof(desc.status));
80✔
1275
    } else {
1276
      tstrncpy(desc.status, "offline", sizeof(desc.status));
×
1277
    }
1278
    if (taosArrayPush(pClusterInfo->dnodes, &desc) == NULL) {
160✔
1279
      mError("failed put dnode into array, but continue at this monitor report")
×
1280
    }
1281
    sdbRelease(pSdb, pObj);
80✔
1282
  }
1283

1284
  pIter = NULL;
80✔
1285
  while (1) {
80✔
1286
    SMnodeObj *pObj = NULL;
160✔
1287
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
160✔
1288
    if (pIter == NULL) break;
160✔
1289

1290
    SMonMnodeDesc desc = {0};
80✔
1291
    desc.mnode_id = pObj->id;
80✔
1292
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
80✔
1293

1294
    if (pObj->id == pMnode->selfDnodeId) {
80✔
1295
      pClusterInfo->first_ep_dnode_id = pObj->id;
80✔
1296
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
80✔
1297
      // pClusterInfo->master_uptime = (float)mndGetClusterUpTime(pMnode) / 86400.0f;
1298
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
80✔
1299
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
1300
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
80✔
1301
      desc.syncState = TAOS_SYNC_STATE_LEADER;
80✔
1302
    } else {
1303
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
×
1304
      desc.syncState = pObj->syncState;
×
1305
    }
1306
    if (taosArrayPush(pClusterInfo->mnodes, &desc) == NULL) {
160✔
1307
      mError("failed to put mnode into array, but continue at this monitor report");
×
1308
    }
1309
    sdbRelease(pSdb, pObj);
80✔
1310
  }
1311

1312
  // vgroup info
1313
  pIter = NULL;
80✔
1314
  while (1) {
480✔
1315
    SVgObj *pVgroup = NULL;
560✔
1316
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
560✔
1317
    if (pIter == NULL) break;
560✔
1318

1319
    if (pVgroup->mountVgId) {
480✔
1320
      sdbRelease(pSdb, pVgroup);
×
1321
      continue;
×
1322
    }
1323

1324
    pClusterInfo->vgroups_total++;
480✔
1325
    pClusterInfo->tbs_total += pVgroup->numOfTables;
480✔
1326

1327
    SMonVgroupDesc desc = {0};
480✔
1328
    desc.vgroup_id = pVgroup->vgId;
480✔
1329

1330
    SName name = {0};
480✔
1331
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
480✔
1332
    if (code < 0) {
480✔
1333
      mError("failed to get db name since %s", tstrerror(code));
×
1334
      sdbCancelFetch(pSdb, pIter);
×
1335
      sdbRelease(pSdb, pVgroup);
×
1336
      TAOS_RETURN(code);
×
1337
    }
1338
    (void)tNameGetDbName(&name, desc.database_name);
480✔
1339

1340
    desc.tables_num = pVgroup->numOfTables;
480✔
1341
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
480✔
1342
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
480✔
1343
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
960✔
1344
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
480✔
1345
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
480✔
1346
      pVnDesc->dnode_id = pVgid->dnodeId;
480✔
1347
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
480✔
1348
      pVnDesc->syncState = pVgid->syncState;
480✔
1349
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
480✔
1350
        tstrncpy(desc.status, "ready", sizeof(desc.status));
480✔
1351
        pClusterInfo->vgroups_alive++;
480✔
1352
      }
1353
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR && pVgid->syncState != TAOS_SYNC_STATE_OFFLINE) {
480✔
1354
        pClusterInfo->vnodes_alive++;
480✔
1355
      }
1356
      pClusterInfo->vnodes_total++;
480✔
1357
    }
1358

1359
    if (taosArrayPush(pVgroupInfo->vgroups, &desc) == NULL) {
960✔
1360
      mError("failed to put vgroup into array, but continue at this monitor report")
×
1361
    }
1362
    sdbRelease(pSdb, pVgroup);
480✔
1363
  }
1364

1365
  // stb info
1366
  pIter = NULL;
80✔
1367
  while (1) {
80✔
1368
    SStbObj *pStb = NULL;
160✔
1369
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
160✔
1370
    if (pIter == NULL) break;
160✔
1371

1372
    SMonStbDesc desc = {0};
80✔
1373

1374
    SName name1 = {0};
80✔
1375
    code = tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
80✔
1376
    if (code < 0) {
80✔
1377
      mError("failed to get db name since %s", tstrerror(code));
×
1378
      sdbRelease(pSdb, pStb);
×
1379
      TAOS_RETURN(code);
×
1380
    }
1381
    (void)tNameGetDbName(&name1, desc.database_name);
80✔
1382

1383
    SName name2 = {0};
80✔
1384
    code = tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
80✔
1385
    if (code < 0) {
80✔
1386
      mError("failed to get table name since %s", tstrerror(code));
×
1387
      sdbRelease(pSdb, pStb);
×
1388
      TAOS_RETURN(code);
×
1389
    }
1390
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);
80✔
1391

1392
    if (taosArrayPush(pStbInfo->stbs, &desc) == NULL) {
160✔
1393
      mError("failed to put stb into array, but continue at this monitor report");
×
1394
    }
1395
    sdbRelease(pSdb, pStb);
80✔
1396
  }
1397

1398
  // grant info
1399
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000;
80✔
1400
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
80✔
1401
  if (pMnode->grant.expireTimeMS == 0) {
80✔
1402
    pGrantInfo->expire_time = 0;
×
1403
    pGrantInfo->timeseries_total = 0;
×
1404
  }
1405

1406
  mndReleaseRpc(pMnode);
80✔
1407
  TAOS_RETURN(code);
80✔
1408
}
1409

1410
int32_t mndResetTimer(SMnode *pMnode){
×
1411
  return syncResetTimer(pMnode->syncMgmt.sync, tsMnodeElectIntervalMs, tsMnodeHeartbeatIntervalMs);
×
1412
}
1413

1414
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
36,190,933✔
1415
  mTrace("mnode get load");
36,190,933✔
1416
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
36,190,933✔
1417
  pLoad->syncState = state.state;
36,190,933✔
1418
  pLoad->syncRestore = state.restored;
36,190,933✔
1419
  pLoad->syncTerm = state.term;
36,190,933✔
1420
  pLoad->roleTimeMs = state.roleTimeMs;
36,190,933✔
1421
  mTrace("mnode current syncState is %s, syncRestore:%d, syncTerm:%" PRId64 " ,roleTimeMs:%" PRId64,
36,190,933✔
1422
         syncStr(pLoad->syncState), pLoad->syncRestore, pLoad->syncTerm, pLoad->roleTimeMs);
1423
  return 0;
36,190,933✔
1424
}
1425

1426
int64_t mndGetRoleTimeMs(SMnode *pMnode) {
11,536,451✔
1427
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
11,536,451✔
1428
  return state.roleTimeMs;
11,536,451✔
1429
}
1430

1431
void mndSetRestored(SMnode *pMnode, bool restored) {
514,127✔
1432
  if (restored) {
514,127✔
1433
    (void)taosThreadRwlockWrlock(&pMnode->lock);
514,127✔
1434
    pMnode->restored = true;
514,127✔
1435
    (void)taosThreadRwlockUnlock(&pMnode->lock);
514,127✔
1436
    mInfo("mnode set restored:%d", restored);
514,127✔
1437
  } else {
1438
    (void)taosThreadRwlockWrlock(&pMnode->lock);
×
1439
    pMnode->restored = false;
×
1440
    (void)taosThreadRwlockUnlock(&pMnode->lock);
×
1441
    mInfo("mnode set restored:%d", restored);
×
1442
    while (1) {
1443
      if (pMnode->rpcRef <= 0) break;
×
1444
      taosMsleep(3);
×
1445
    }
1446
  }
1447
}
514,127✔
1448

1449
bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }
×
1450

1451
void mndSetStop(SMnode *pMnode) {
514,127✔
1452
  (void)taosThreadRwlockWrlock(&pMnode->lock);
514,127✔
1453
  pMnode->stopped = true;
514,127✔
1454
  (void)taosThreadRwlockUnlock(&pMnode->lock);
514,127✔
1455
  mInfo("mnode set stopped");
514,127✔
1456
}
514,127✔
1457

1458
bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }
731,897,174✔
1459

1460
void mndSetSoDPhase(SMnode *pMnode, int8_t phase) {
372✔
1461
  (void)taosThreadRwlockWrlock(&pMnode->lock);
372✔
1462
  pMnode->sodPhase = phase;
372✔
1463
  (void)taosThreadRwlockUnlock(&pMnode->lock);
372✔
1464
}
372✔
1465

1466
int8_t mndGetSoDPhase(SMnode *pMnode) {
77,235✔
1467
  int8_t result = TSDB_SOD_PHASE_STABLE;
77,235✔
1468
  (void)taosThreadRwlockRdlock(&pMnode->lock);
77,235✔
1469
  result = pMnode->sodPhase;
77,235✔
1470
  (void)taosThreadRwlockUnlock(&pMnode->lock);
77,235✔
1471
  if (result < TSDB_SOD_PHASE_STABLE || result > TSDB_SOD_PHASE_ENFORCE) {
77,235✔
1472
    mWarn("invalid SoD phase:%d, reset to stable", result);
×
1473
    result = TSDB_SOD_PHASE_STABLE;
×
1474
  }
1475
  return result;
77,235✔
1476
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc