• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4875

09 Dec 2025 01:22AM UTC coverage: 64.472% (-0.2%) from 64.623%
#4875

push

travis-ci

guanshengliang
fix: temporarily disable memory leak detection for UDF tests (#33856)

162014 of 251293 relevant lines covered (64.47%)

104318075.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.25
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "taos_monitor.h"
33
#include "tconfig.h"
34
#include "tjson.h"
35
#include "tmisce.h"
36
#include "tunit.h"
37

38
#define TSDB_DNODE_VER_NUMBER   2
39
#define TSDB_DNODE_RESERVE_SIZE 40
40

41
static const char *offlineReason[] = {
42
    "",
43
    "status msg timeout",
44
    "status not received",
45
    "version not match",
46
    "dnodeId not match",
47
    "clusterId not match",
48
    "statusInterval not match",
49
    "timezone not match",
50
    "locale not match",
51
    "charset not match",
52
    "ttlChangeOnWrite not match",
53
    "enableWhiteList not match",
54
    "encryptionKey not match",
55
    "monitor not match",
56
    "monitor switch not match",
57
    "monitor interval not match",
58
    "monitor slow log threshold not match",
59
    "monitor slow log sql max len not match",
60
    "monitor slow log scope not match",
61
    "unknown",
62
};
63

64
enum {
65
  DND_ACTIVE_CODE,
66
  DND_CONN_ACTIVE_CODE,
67
};
68

69
enum {
70
  DND_CREATE,
71
  DND_ADD,
72
  DND_DROP,
73
};
74

75
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
76
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
77
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
78
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
79
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
80
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
81
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
82

83
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
91
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
92
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
93

94
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
96
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
98

99
#ifdef _GRANT
100
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
101
#else
102
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
103
#endif
104

105
int32_t mndInitDnode(SMnode *pMnode) {
503,449✔
106
  SSdbTable table = {
503,449✔
107
      .sdbType = SDB_DNODE,
108
      .keyType = SDB_KEY_INT32,
109
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
110
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
111
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
112
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
113
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
114
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
115
  };
116

117
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
503,449✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
503,449✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
503,449✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
503,449✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
503,449✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
503,449✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
503,449✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
503,449✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
503,449✔
126
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
503,449✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
503,449✔
128

129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
503,449✔
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
503,449✔
131
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
503,449✔
132
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
503,449✔
133

134
  return sdbSetTable(pMnode->pSdb, table);
503,449✔
135
}
136

137
void mndCleanupDnode(SMnode *pMnode) {}
502,637✔
138

139
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
327,978✔
140
  int32_t  code = -1;
327,978✔
141
  SSdbRaw *pRaw = NULL;
327,978✔
142
  STrans  *pTrans = NULL;
327,978✔
143

144
  SDnodeObj dnodeObj = {0};
327,978✔
145
  dnodeObj.id = 1;
327,978✔
146
  dnodeObj.createdTime = taosGetTimestampMs();
327,978✔
147
  dnodeObj.updateTime = dnodeObj.createdTime;
327,978✔
148
  dnodeObj.port = tsServerPort;
327,978✔
149
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
327,978✔
150
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
327,978✔
151
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
327,978✔
152
  char *machineId = NULL;
327,978✔
153
  code = tGetMachineId(&machineId);
327,978✔
154
  if (machineId) {
327,978✔
155
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
327,978✔
156
    taosMemoryFreeClear(machineId);
327,978✔
157
  } else {
158
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
159
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
160
    goto _OVER;
×
161
#endif
162
  }
163

164
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
327,978✔
165
  if (pTrans == NULL) {
327,978✔
166
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
167
    if (terrno != 0) code = terrno;
×
168
    goto _OVER;
×
169
  }
170
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
327,978✔
171

172
  pRaw = mndDnodeActionEncode(&dnodeObj);
327,978✔
173
  if (pRaw == NULL) {
327,978✔
174
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
175
    if (terrno != 0) code = terrno;
×
176
    goto _OVER;
×
177
  }
178
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
327,978✔
179
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
327,978✔
180
  pRaw = NULL;
327,978✔
181

182
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
327,978✔
183
  code = 0;
327,978✔
184

185
_OVER:
327,978✔
186
  mndTransDrop(pTrans);
327,978✔
187
  sdbFreeRaw(pRaw);
327,978✔
188
  return code;
327,978✔
189
}
190

191
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,906,006✔
192
  int32_t code = 0;
2,906,006✔
193
  int32_t lino = 0;
2,906,006✔
194
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,906,006✔
195

196
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,906,006✔
197
  if (pRaw == NULL) goto _OVER;
2,906,006✔
198

199
  int32_t dataPos = 0;
2,906,006✔
200
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,906,006✔
201
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,906,006✔
202
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,906,006✔
203
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,906,006✔
204
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,906,006✔
205
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,906,006✔
206
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,906,006✔
207
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,906,006✔
208
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,906,006✔
209
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,906,006✔
210

211
  terrno = 0;
2,906,006✔
212

213
_OVER:
2,906,006✔
214
  if (terrno != 0) {
2,906,006✔
215
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
216
    sdbFreeRaw(pRaw);
×
217
    return NULL;
×
218
  }
219

220
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,906,006✔
221
  return pRaw;
2,906,006✔
222
}
223

224
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
2,160,987✔
225
  int32_t code = 0;
2,160,987✔
226
  int32_t lino = 0;
2,160,987✔
227
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,160,987✔
228
  SSdbRow   *pRow = NULL;
2,160,987✔
229
  SDnodeObj *pDnode = NULL;
2,160,987✔
230

231
  int8_t sver = 0;
2,160,987✔
232
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
2,160,987✔
233
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
2,160,987✔
234
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
235
    goto _OVER;
×
236
  }
237

238
  pRow = sdbAllocRow(sizeof(SDnodeObj));
2,160,987✔
239
  if (pRow == NULL) goto _OVER;
2,160,987✔
240

241
  pDnode = sdbGetRowObj(pRow);
2,160,987✔
242
  if (pDnode == NULL) goto _OVER;
2,160,987✔
243

244
  int32_t dataPos = 0;
2,160,987✔
245
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
2,160,987✔
246
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
2,160,987✔
247
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
2,160,987✔
248
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
2,160,987✔
249
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,160,987✔
250
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,160,987✔
251
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,160,987✔
252
  if (sver > 1) {
2,160,987✔
253
    int16_t keyLen = 0;
2,160,987✔
254
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,160,987✔
255
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,160,987✔
256
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,160,987✔
257
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,160,987✔
258
  }
259

260
  terrno = 0;
2,160,987✔
261
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
2,160,987✔
262
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
263
  }
264

265
_OVER:
2,160,987✔
266
  if (terrno != 0) {
2,160,987✔
267
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
268
    taosMemoryFreeClear(pRow);
×
269
    return NULL;
×
270
  }
271

272
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
2,160,987✔
273
  return pRow;
2,160,987✔
274
}
275

276
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
1,075,285✔
277
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
1,075,285✔
278
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
1,075,285✔
279

280
  char ep[TSDB_EP_LEN] = {0};
1,075,285✔
281
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
1,075,285✔
282
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
1,075,285✔
283
  return 0;
1,075,285✔
284
}
285

286
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
2,160,438✔
287
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
2,160,438✔
288
  return 0;
2,160,438✔
289
}
290

291
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
1,073,067✔
292
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
1,073,067✔
293
  pOld->updateTime = pNew->updateTime;
1,073,067✔
294
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
295
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
1,073,067✔
296
#endif
297
  return 0;
1,073,067✔
298
}
299

300
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
154,287,606✔
301
  SSdb      *pSdb = pMnode->pSdb;
154,287,606✔
302
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
154,287,606✔
303
  if (pDnode == NULL) {
154,287,913✔
304
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
311,099✔
305
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
62,722✔
306
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
248,377✔
307
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
308
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
248,377✔
309
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
248,377✔
310
    } else {
311
      terrno = TSDB_CODE_APP_ERROR;
×
312
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
313
    }
314
  }
315

316
  return pDnode;
154,287,913✔
317
}
318

319
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
155,945,994✔
320
  SSdb *pSdb = pMnode->pSdb;
155,945,994✔
321
  sdbRelease(pSdb, pDnode);
155,945,994✔
322
}
155,946,308✔
323

324
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
10,513,010✔
325
  SEpSet epSet = {0};
10,513,010✔
326
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
10,513,010✔
327
  return epSet;
10,513,010✔
328
}
329

330
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
1,238,971✔
331
  SEpSet     epSet = {0};
1,238,971✔
332
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,238,971✔
333
  if (!pDnode) return epSet;
1,238,971✔
334

335
  epSet = mndGetDnodeEpset(pDnode);
1,238,971✔
336

337
  mndReleaseDnode(pMnode, pDnode);
1,238,971✔
338
  return epSet;
1,238,971✔
339
}
340

341
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,705,342✔
342
  SSdb *pSdb = pMnode->pSdb;
1,705,342✔
343

344
  void *pIter = NULL;
1,705,342✔
345
  while (1) {
3,045,170✔
346
    SDnodeObj *pDnode = NULL;
4,750,512✔
347
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
4,750,512✔
348
    if (pIter == NULL) break;
4,750,512✔
349

350
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
3,746,036✔
351
      sdbCancelFetch(pSdb, pIter);
700,866✔
352
      return pDnode;
700,866✔
353
    }
354

355
    sdbRelease(pSdb, pDnode);
3,045,170✔
356
  }
357

358
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
1,004,476✔
359
  return NULL;
1,004,476✔
360
}
361

362
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
191,263✔
363
  SSdb *pSdb = pMnode->pSdb;
191,263✔
364

365
  void *pIter = NULL;
191,263✔
366
  while (1) {
209,479✔
367
    SDnodeObj *pDnode = NULL;
400,742✔
368
    ESdbStatus objStatus = 0;
400,742✔
369
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
400,742✔
370
    if (pIter == NULL) break;
400,742✔
371

372
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
400,742✔
373
      sdbCancelFetch(pSdb, pIter);
191,263✔
374
      return pDnode;
191,263✔
375
    }
376

377
    sdbRelease(pSdb, pDnode);
209,479✔
378
  }
379

380
  return NULL;
×
381
}
382

383
int32_t mndGetDnodeSize(SMnode *pMnode) {
69,396,101✔
384
  SSdb *pSdb = pMnode->pSdb;
69,396,101✔
385
  return sdbGetSize(pSdb, SDB_DNODE);
69,396,679✔
386
}
387

388
int32_t mndGetDbSize(SMnode *pMnode) {
×
389
  SSdb *pSdb = pMnode->pSdb;
×
390
  return sdbGetSize(pSdb, SDB_DB);
×
391
}
392

393
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
89,212,490✔
394
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
89,212,490✔
395
  if (interval > (int64_t)tsStatusTimeoutMs) {
89,212,006✔
396
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,656,026✔
397
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
57,911✔
398
    }
399
    return false;
2,655,312✔
400
  }
401
  return true;
86,555,980✔
402
}
403

404
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,665,617✔
405
  SSdb *pSdb = pMnode->pSdb;
2,665,617✔
406

407
  int32_t numOfEps = 0;
2,665,617✔
408
  void   *pIter = NULL;
2,665,617✔
409
  while (1) {
9,867,218✔
410
    SDnodeObj *pDnode = NULL;
12,532,835✔
411
    ESdbStatus objStatus = 0;
12,532,835✔
412
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
12,532,835✔
413
    if (pIter == NULL) break;
12,532,835✔
414

415
    SDnodeEp dnodeEp = {0};
9,867,218✔
416
    dnodeEp.id = pDnode->id;
9,867,218✔
417
    dnodeEp.ep.port = pDnode->port;
9,867,218✔
418
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
9,867,218✔
419
    sdbRelease(pSdb, pDnode);
9,867,218✔
420

421
    dnodeEp.isMnode = 0;
9,867,218✔
422
    if (mndIsMnode(pMnode, pDnode->id)) {
9,867,218✔
423
      dnodeEp.isMnode = 1;
3,886,415✔
424
    }
425
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
9,867,218✔
426
      mError("failed to put ep into array, but continue at this call");
×
427
    }
428
  }
429
}
2,665,617✔
430

431
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
28,216,755✔
432
  SSdb   *pSdb = pMnode->pSdb;
28,216,755✔
433
  int32_t code = 0;
28,216,755✔
434

435
  int32_t numOfEps = 0;
28,216,755✔
436
  void   *pIter = NULL;
28,216,755✔
437
  while (1) {
125,799,208✔
438
    SDnodeObj *pDnode = NULL;
154,015,963✔
439
    ESdbStatus objStatus = 0;
154,015,963✔
440
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
154,015,963✔
441
    if (pIter == NULL) break;
154,015,963✔
442

443
    SDnodeInfo dInfo;
125,794,254✔
444
    dInfo.id = pDnode->id;
125,799,208✔
445
    dInfo.ep.port = pDnode->port;
125,799,208✔
446
    dInfo.offlineReason = pDnode->offlineReason;
125,799,208✔
447
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
125,799,208✔
448
    sdbRelease(pSdb, pDnode);
125,799,208✔
449
    if (mndIsMnode(pMnode, pDnode->id)) {
125,799,208✔
450
      dInfo.isMnode = 1;
38,232,760✔
451
    } else {
452
      dInfo.isMnode = 0;
87,566,448✔
453
    }
454

455
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
125,799,208✔
456
      code = terrno;
×
457
      sdbCancelFetch(pSdb, pIter);
×
458
      break;
×
459
    }
460
  }
461
  TAOS_RETURN(code);
28,216,755✔
462
}
463

464
#define CHECK_MONITOR_PARA(para, err)                                                                    \
465
  if (pCfg->monitorParas.para != para) {                                                                 \
466
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
467
    terrno = err;                                                                                        \
468
    return err;                                                                                          \
469
  }
470

471
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
2,672,529✔
472
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
2,672,529✔
473
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
2,672,529✔
474
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
2,672,529✔
475
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
2,672,529✔
476
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
2,672,529✔
477

478
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
2,672,529✔
479
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
480
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
481
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
482
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
483
  }
484

485
  /*
486
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
487
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
488
           tsStatusIntervalMs);
489
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
490
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
491
  }
492
  */
493

494
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
2,672,529✔
495
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
6,912✔
496
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
497
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
6,912✔
498
    return DND_REASON_TIME_ZONE_NOT_MATCH;
6,912✔
499
  }
500

501
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
2,665,617✔
502
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
503
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
504
    return DND_REASON_LOCALE_NOT_MATCH;
×
505
  }
506

507
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
2,665,617✔
508
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
509
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
510
    return DND_REASON_CHARSET_NOT_MATCH;
×
511
  }
512

513
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
2,665,617✔
514
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
515
           tsTtlChangeOnWrite);
516
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
517
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
518
  }
519
  int8_t enable = tsEnableWhiteList ? 1 : 0;
2,665,617✔
520
  if (pCfg->enableWhiteList != enable) {
2,665,617✔
521
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
522
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
523
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
524
  }
525

526
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
2,665,617✔
527
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
2,665,617✔
528
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
529
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
530
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
531
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
532
  }
533

534
  return DND_REASON_ONLINE;
2,665,617✔
535
}
536

537
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
69,364✔
538
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
69,364✔
539
    return 0.0;
599✔
540
  }
541

542
  int64_t deltaCount = currentCount - lastCount;
68,765✔
543
  int64_t deltaMs = currentTimeMs - lastTimeMs;
68,765✔
544
  double  rate = (double)deltaCount / (double)deltaMs;
68,765✔
545
  return rate;
68,765✔
546
}
547

548
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
101,893,573✔
549
  bool stateChanged = false;
101,893,573✔
550
  bool roleChanged = pGid->syncState != pVload->syncState ||
101,982,689✔
551
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
198,531,867✔
552
                     pGid->roleTimeMs != pVload->roleTimeMs;
96,638,294✔
553

554
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
101,893,573✔
555
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
146,305✔
556
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
70,637✔
557
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
75,668✔
558
      int64_t currentTimeMs = taosGetTimestampMs();
69,364✔
559
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
69,364✔
560
                                          pGid->lastSyncAppliedIndexUpdateTime);
561

562
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
69,364✔
563
    }
564
  }
565

566
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
101,893,573✔
567
  pGid->syncCommitIndex = pVload->syncCommitIndex;
101,893,573✔
568
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
101,893,573✔
569
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
101,893,573✔
570
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
101,893,573✔
571
      pGid->startTimeMs != pVload->startTimeMs) {
96,317,028✔
572
    mInfo(
5,576,545✔
573
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
574
        "canRead:%d, dnode:%d",
575
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
576
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
577
    pGid->syncState = pVload->syncState;
5,576,545✔
578
    pGid->syncTerm = pVload->syncTerm;
5,576,545✔
579
    pGid->syncRestore = pVload->syncRestore;
5,576,545✔
580
    pGid->syncCanRead = pVload->syncCanRead;
5,576,545✔
581
    pGid->startTimeMs = pVload->startTimeMs;
5,576,545✔
582
    pGid->roleTimeMs = pVload->roleTimeMs;
5,576,545✔
583
    stateChanged = true;
5,576,545✔
584
  }
585
  return stateChanged;
101,893,573✔
586
}
587

588
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
22,383,955✔
589
  bool stateChanged = false;
22,383,955✔
590
  bool roleChanged = pObj->syncState != pMload->syncState ||
22,451,863✔
591
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
44,212,746✔
592
                     pObj->roleTimeMs != pMload->roleTimeMs;
21,828,791✔
593
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
22,383,955✔
594
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
572,611✔
595
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
596
          pObj->syncTerm, pMload->syncTerm);
597
    pObj->syncState = pMload->syncState;
572,611✔
598
    pObj->syncTerm = pMload->syncTerm;
572,611✔
599
    pObj->syncRestore = pMload->syncRestore;
572,611✔
600
    pObj->roleTimeMs = pMload->roleTimeMs;
572,611✔
601
    stateChanged = true;
572,611✔
602
  }
603
  return stateChanged;
22,383,955✔
604
}
605

606
extern char   *tsMonFwUri;
607
extern char   *tsMonSlowLogUri;
608
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
144✔
609
  SMnode    *pMnode = pReq->info.node;
144✔
610
  SStatisReq statisReq = {0};
144✔
611
  int32_t    code = -1;
144✔
612

613
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
144✔
614

615
  if (tsMonitorLogProtocol) {
144✔
616
    mInfo("process statis req,\n %s", statisReq.pCont);
144✔
617
  }
618

619
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
144✔
620
    monSendContent(statisReq.pCont, tsMonFwUri);
144✔
621
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
622
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
623
  }
624

625
  tFreeSStatisReq(&statisReq);
144✔
626
  return 0;
144✔
627
}
628

629
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
1,549,477✔
630
  mTrace("process audit req:%p", pReq);
1,549,477✔
631
  if (tsEnableAudit && tsEnableAuditDelete) {
1,549,477✔
632
    SMnode   *pMnode = pReq->info.node;
1,549,477✔
633
    SAuditReq auditReq = {0};
1,549,477✔
634

635
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
1,549,477✔
636

637
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
1,549,477✔
638

639
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
1,549,477✔
640
                   auditReq.sqlLen);
641

642
    tFreeSAuditReq(&auditReq);
1,549,477✔
643
  }
644
  return 0;
1,549,477✔
645
}
646

647
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
775,776✔
648
  int32_t       code = 0, lino = 0;
775,776✔
649
  SDnodeInfoReq infoReq = {0};
775,776✔
650
  int32_t       contLen = 0;
775,776✔
651
  void         *pReq = NULL;
775,776✔
652

653
  infoReq.dnodeId = pDnode->id;
775,776✔
654
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
775,776✔
655

656
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
775,776✔
657
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
658
  }
659
  pReq = rpcMallocCont(contLen);
775,776✔
660
  if (pReq == NULL) {
775,776✔
661
    TAOS_RETURN(terrno);
×
662
  }
663

664
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
775,776✔
665
    code = contLen;
×
666
    goto _exit;
×
667
  }
668

669
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
775,776✔
670
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
775,776✔
671
_exit:
775,776✔
672
  if (code < 0) {
775,776✔
673
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
674
  }
675
  TAOS_RETURN(code);
775,776✔
676
}
677

678
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
775,776✔
679
  int32_t       code = 0, lino = 0;
775,776✔
680
  SMnode       *pMnode = pReq->info.node;
775,776✔
681
  SDnodeInfoReq infoReq = {0};
775,776✔
682
  SDnodeObj    *pDnode = NULL;
775,776✔
683
  STrans       *pTrans = NULL;
775,776✔
684
  SSdbRaw      *pCommitRaw = NULL;
775,776✔
685

686
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
775,776✔
687

688
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
775,776✔
689
  if (pDnode == NULL) {
775,776✔
690
    TAOS_CHECK_EXIT(terrno);
×
691
  }
692

693
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
775,776✔
694
  if (pTrans == NULL) {
775,776✔
695
    TAOS_CHECK_EXIT(terrno);
×
696
  }
697

698
  pDnode->updateTime = taosGetTimestampMs();
775,776✔
699

700
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
775,776✔
701
    TAOS_CHECK_EXIT(terrno);
×
702
  }
703
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
775,776✔
704
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
705
    TAOS_CHECK_EXIT(code);
×
706
  }
707
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
775,776✔
708
  pCommitRaw = NULL;
775,776✔
709

710
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
775,776✔
711
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
712
    TAOS_CHECK_EXIT(code);
×
713
  }
714

715
_exit:
775,776✔
716
  mndReleaseDnode(pMnode, pDnode);
775,776✔
717
  if (code != 0) {
775,776✔
718
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
719
  }
720
  mndTransDrop(pTrans);
775,776✔
721
  sdbFreeRaw(pCommitRaw);
775,776✔
722
  TAOS_RETURN(code);
775,776✔
723
}
724

725
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
40,862,771✔
726
  SMnode    *pMnode = pReq->info.node;
40,862,771✔
727
  SStatusReq statusReq = {0};
40,862,771✔
728
  SDnodeObj *pDnode = NULL;
40,862,771✔
729
  int32_t    code = -1;
40,862,771✔
730

731
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
40,862,771✔
732

733
  int64_t clusterid = mndGetClusterId(pMnode);
40,862,771✔
734
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
40,862,771✔
735
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
736
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
737
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
738
    goto _OVER;
×
739
  }
740

741
  if (statusReq.dnodeId == 0) {
40,862,771✔
742
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
1,273,107✔
743
    if (pDnode == NULL) {
1,273,107✔
744
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
573,975✔
745
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
573,975✔
746
      if (terrno != 0) code = terrno;
573,975✔
747
      goto _OVER;
573,975✔
748
    }
749
  } else {
750
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
39,589,664✔
751
    if (pDnode == NULL) {
39,589,664✔
752
      int32_t err = terrno;
240,241✔
753
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
240,241✔
754
      if (pDnode != NULL) {
240,241✔
755
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
1,734✔
756
        terrno = err;
1,734✔
757
        goto _OVER;
1,734✔
758
      }
759

760
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
238,507✔
761
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
238,507✔
762
        terrno = err;
47,244✔
763
        goto _OVER;
47,244✔
764
      } else {
765
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
191,263✔
766
        if (pDnode == NULL) goto _OVER;
191,263✔
767
      }
768
    }
769
  }
770

771
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
40,239,818✔
772
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
40,239,818✔
773

774
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
40,239,818✔
775
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
40,239,818✔
776
  int64_t curMs = taosGetTimestampMs();
40,239,818✔
777
  bool    online = mndIsDnodeOnline(pDnode, curMs);
40,239,818✔
778
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
40,239,818✔
779
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
40,239,818✔
780
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
40,239,818✔
781
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
40,239,818✔
782
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
40,239,818✔
783
  bool    analVerChanged = (analVer != statusReq.analVer);
40,239,818✔
784
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
39,449,193✔
785
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
37,567,913✔
786
                   encryptKeyChanged || enableWhiteListChanged;
79,689,011✔
787
  const STraceId *trace = &pReq->info.traceId;
40,239,818✔
788
  char            timestamp[TD_TIME_STR_LEN] = {0};
40,239,818✔
789
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
40,239,818✔
790
  mGTrace(
40,239,818✔
791
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
792
      "timestamp:%s",
793
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
794

795
  if (reboot) {
40,239,818✔
796
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
834,245✔
797
  }
798

799
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
143,162,603✔
800
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
102,922,785✔
801

802
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
102,922,785✔
803
    if (pVgroup != NULL) {
102,922,785✔
804
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
101,958,306✔
805
        pVgroup->cacheUsage = pVload->cacheUsage;
69,521,485✔
806
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
69,521,485✔
807
        pVgroup->numOfTables = pVload->numOfTables;
69,521,485✔
808
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
69,521,485✔
809
        pVgroup->totalStorage = pVload->totalStorage;
69,521,485✔
810
        pVgroup->compStorage = pVload->compStorage;
69,521,485✔
811
        pVgroup->pointsWritten = pVload->pointsWritten;
69,521,485✔
812
      }
813
      bool stateChanged = false;
101,958,306✔
814
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
147,244,143✔
815
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
147,179,410✔
816
        if (pGid->dnodeId == statusReq.dnodeId) {
147,179,410✔
817
          if (pVload->startTimeMs == 0) {
101,893,573✔
818
            pVload->startTimeMs = statusReq.rebootTime;
×
819
          }
820
          if (pVload->roleTimeMs == 0) {
101,893,573✔
821
            pVload->roleTimeMs = statusReq.rebootTime;
×
822
          }
823
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
101,893,573✔
824
          break;
101,893,573✔
825
        }
826
      }
827
      if (stateChanged) {
101,958,306✔
828
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,576,545✔
829
        if (pDb != NULL && pDb->stateTs != curMs) {
5,576,545✔
830
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,848,649✔
831
                pDb->stateTs, curMs);
832
          pDb->stateTs = curMs;
3,848,649✔
833
        }
834
        mndReleaseDb(pMnode, pDb);
5,576,545✔
835
      }
836
    }
837

838
    mndReleaseVgroup(pMnode, pVgroup);
102,922,785✔
839
  }
840

841
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
40,239,818✔
842
  if (pObj != NULL) {
40,239,818✔
843
    if (statusReq.mload.roleTimeMs == 0) {
22,383,955✔
844
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
263,754✔
845
    }
846
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
22,383,955✔
847
    mndReleaseMnode(pMnode, pObj);
22,383,955✔
848
  }
849

850
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
40,239,818✔
851
  if (pQnode != NULL) {
40,239,818✔
852
    pQnode->load = statusReq.qload;
283,803✔
853
    mndReleaseQnode(pMnode, pQnode);
283,803✔
854
  }
855

856
  if (needCheck) {
40,239,818✔
857
    if (statusReq.sver != tsVersion) {
2,672,529✔
858
      if (pDnode != NULL) {
×
859
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
860
      }
861
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
862
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
863
      goto _OVER;
×
864
    }
865

866
    if (statusReq.dnodeId == 0) {
2,672,529✔
867
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
699,132✔
868
    } else {
869
      if (statusReq.clusterId != pMnode->clusterId) {
1,973,397✔
870
        if (pDnode != NULL) {
×
871
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
872
        }
873
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
874
               pMnode->clusterId);
875
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
876
        goto _OVER;
×
877
      }
878
    }
879

880
    // Verify whether the cluster parameters are consistent when status change from offline to ready
881
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
2,672,529✔
882
    if (pDnode->offlineReason != 0) {
2,672,529✔
883
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
6,912✔
884
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
6,912✔
885
      goto _OVER;
6,912✔
886
    }
887

888
    if (!online) {
2,665,617✔
889
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
783,713✔
890
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
891
    } else {
892
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
1,881,904✔
893
            statusReq.dnodeVer, dnodeVer, reboot);
894
    }
895

896
    pDnode->rebootTime = statusReq.rebootTime;
2,665,617✔
897
    pDnode->numOfCores = statusReq.numOfCores;
2,665,617✔
898
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,665,617✔
899
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,665,617✔
900
    pDnode->memAvail = statusReq.memAvail;
2,665,617✔
901
    pDnode->memTotal = statusReq.memTotal;
2,665,617✔
902
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,665,617✔
903
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,665,617✔
904
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,665,617✔
905
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
775,776✔
906
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
775,776✔
907
        goto _OVER;
×
908
      }
909
    }
910

911
    SStatusRsp statusRsp = {0};
2,665,617✔
912
    statusRsp.statusSeq++;
2,665,617✔
913
    statusRsp.analVer = analVer;
2,665,617✔
914
    statusRsp.dnodeVer = dnodeVer;
2,665,617✔
915
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,665,617✔
916
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,665,617✔
917
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,665,617✔
918
    if (statusRsp.pDnodeEps == NULL) {
2,665,617✔
919
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
920
      goto _OVER;
×
921
    }
922

923
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,665,617✔
924
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,665,617✔
925
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,665,617✔
926

927
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,665,617✔
928
    void   *pHead = rpcMallocCont(contLen);
2,665,617✔
929
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,665,617✔
930
    taosArrayDestroy(statusRsp.pDnodeEps);
2,665,617✔
931
    if (contLen < 0) {
2,665,617✔
932
      code = contLen;
×
933
      goto _OVER;
×
934
    }
935

936
    pReq->info.rspLen = contLen;
2,665,617✔
937
    pReq->info.rsp = pHead;
2,665,617✔
938
  }
939

940
  pDnode->accessTimes++;
40,232,906✔
941
  pDnode->lastAccessTime = curMs;
40,232,906✔
942
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
40,232,906✔
943
    pDnode->offlineReason = DND_REASON_ONLINE;
×
944
  }
945
  code = 0;
40,232,906✔
946

947
_OVER:
40,862,771✔
948
  mndReleaseDnode(pMnode, pDnode);
40,862,771✔
949
  taosArrayDestroy(statusReq.pVloads);
40,862,771✔
950
  return mndUpdClusterInfo(pReq);
40,862,771✔
951
}
952

953
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
954
  SMnode    *pMnode = pReq->info.node;
×
955
  SNotifyReq notifyReq = {0};
×
956
  int32_t    code = 0;
×
957

958
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
959
    terrno = code;
×
960
    goto _OVER;
×
961
  }
962

963
  int64_t clusterid = mndGetClusterId(pMnode);
×
964
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
965
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
966
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
967
          notifyReq.clusterId, clusterid, tstrerror(code));
968
    goto _OVER;
×
969
  }
970

971
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
972
  for (int32_t v = 0; v < nVgroup; ++v) {
×
973
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
974

975
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
976
    if (pVgroup != NULL) {
×
977
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
978
      mndReleaseVgroup(pMnode, pVgroup);
×
979
    }
980
  }
981
  code = mndUpdClusterInfo(pReq);
×
982
_OVER:
×
983
  tFreeSNotifyReq(&notifyReq);
×
984
  return code;
×
985
}
986

987
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
191,994✔
988
  int32_t  code = -1;
191,994✔
989
  SSdbRaw *pRaw = NULL;
191,994✔
990
  STrans  *pTrans = NULL;
191,994✔
991

992
  SDnodeObj dnodeObj = {0};
191,994✔
993
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
191,994✔
994
  dnodeObj.createdTime = taosGetTimestampMs();
191,994✔
995
  dnodeObj.updateTime = dnodeObj.createdTime;
191,994✔
996
  dnodeObj.port = pCreate->port;
191,994✔
997
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
191,994✔
998
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
191,994✔
999

1000
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
191,994✔
1001
  if (pTrans == NULL) {
191,994✔
1002
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1003
    if (terrno != 0) code = terrno;
×
1004
    goto _OVER;
×
1005
  }
1006
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
191,994✔
1007
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
191,994✔
1008

1009
  pRaw = mndDnodeActionEncode(&dnodeObj);
191,994✔
1010
  if (pRaw == NULL) {
191,994✔
1011
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1012
    if (terrno != 0) code = terrno;
×
1013
    goto _OVER;
×
1014
  }
1015
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
191,994✔
1016
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
191,994✔
1017
  pRaw = NULL;
191,994✔
1018

1019
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
191,994✔
1020
  code = 0;
191,994✔
1021

1022
_OVER:
191,994✔
1023
  mndTransDrop(pTrans);
191,994✔
1024
  sdbFreeRaw(pRaw);
191,994✔
1025
  return code;
191,994✔
1026
}
1027

1028
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
79,875✔
1029
  SMnode       *pMnode = pReq->info.node;
79,875✔
1030
  SSdb         *pSdb = pMnode->pSdb;
79,875✔
1031
  SDnodeObj    *pObj = NULL;
79,875✔
1032
  void         *pIter = NULL;
79,875✔
1033
  SDnodeListRsp rsp = {0};
79,875✔
1034
  int32_t       code = -1;
79,875✔
1035

1036
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
79,875✔
1037
  if (NULL == rsp.dnodeList) {
79,875✔
1038
    mError("failed to alloc epSet while process dnode list req");
×
1039
    code = terrno;
×
1040
    goto _OVER;
×
1041
  }
1042

1043
  while (1) {
179,358✔
1044
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
259,233✔
1045
    if (pIter == NULL) break;
259,233✔
1046

1047
    SDNodeAddr dnodeAddr = {0};
179,358✔
1048
    dnodeAddr.nodeId = pObj->id;
179,358✔
1049
    dnodeAddr.epSet.numOfEps = 1;
179,358✔
1050
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
179,358✔
1051
    dnodeAddr.epSet.eps[0].port = pObj->port;
179,358✔
1052

1053
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
358,716✔
1054
      if (terrno != 0) code = terrno;
×
1055
      sdbRelease(pSdb, pObj);
×
1056
      sdbCancelFetch(pSdb, pIter);
×
1057
      goto _OVER;
×
1058
    }
1059

1060
    sdbRelease(pSdb, pObj);
179,358✔
1061
  }
1062

1063
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
79,875✔
1064
  void   *pRsp = rpcMallocCont(rspLen);
79,875✔
1065
  if (pRsp == NULL) {
79,875✔
1066
    code = terrno;
×
1067
    goto _OVER;
×
1068
  }
1069

1070
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
79,875✔
1071
    code = rspLen;
×
1072
    goto _OVER;
×
1073
  }
1074

1075
  pReq->info.rspLen = rspLen;
79,875✔
1076
  pReq->info.rsp = pRsp;
79,875✔
1077
  code = 0;
79,875✔
1078

1079
_OVER:
79,875✔
1080

1081
  if (code != 0) {
79,875✔
1082
    mError("failed to get dnode list since %s", tstrerror(code));
×
1083
  }
1084

1085
  tFreeSDnodeListRsp(&rsp);
79,875✔
1086

1087
  TAOS_RETURN(code);
79,875✔
1088
}
1089

1090
void getSlowLogScopeString(int32_t scope, char *result) {
2,513✔
1091
  if (scope == SLOW_LOG_TYPE_NULL) {
2,513✔
1092
    (void)strncat(result, "NONE", 64);
×
1093
    return;
×
1094
  }
1095
  while (scope > 0) {
5,026✔
1096
    if (scope & SLOW_LOG_TYPE_QUERY) {
2,513✔
1097
      (void)strncat(result, "QUERY", 64);
2,513✔
1098
      scope &= ~SLOW_LOG_TYPE_QUERY;
2,513✔
1099
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1100
      (void)strncat(result, "INSERT", 64);
×
1101
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1102
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1103
      (void)strncat(result, "OTHERS", 64);
×
1104
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1105
    } else {
1106
      (void)printf("invalid slow log scope:%d", scope);
×
1107
      return;
×
1108
    }
1109

1110
    if (scope > 0) {
2,513✔
1111
      (void)strncat(result, "|", 64);
×
1112
    }
1113
  }
1114
}
1115

1116
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
192,353✔
1117
  SMnode         *pMnode = pReq->info.node;
192,353✔
1118
  int32_t         code = -1;
192,353✔
1119
  SDnodeObj      *pDnode = NULL;
192,353✔
1120
  SCreateDnodeReq createReq = {0};
192,353✔
1121
  int32_t         lino = 0;
192,353✔
1122

1123
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
192,353✔
1124
    goto _OVER;
×
1125
  }
1126

1127
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
192,353✔
1128
  TAOS_CHECK_GOTO(code, &lino, _OVER);
192,353✔
1129

1130
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
192,353✔
1131
  code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE);
192,353✔
1132
  TAOS_CHECK_GOTO(code, &lino, _OVER);
192,353✔
1133

1134
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
191,994✔
1135
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1136
    goto _OVER;
×
1137
  }
1138
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1139
  // if (code != 0) {
1140
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1141
  //          tstrerror(code));
1142
  //   goto _OVER;
1143
  // }
1144

1145
  char ep[TSDB_EP_LEN];
191,994✔
1146
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
191,994✔
1147
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
191,994✔
1148
  if (pDnode != NULL) {
191,994✔
1149
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1150
    goto _OVER;
×
1151
  }
1152

1153
  code = mndCreateDnode(pMnode, pReq, &createReq);
191,994✔
1154
  if (code == 0) {
191,994✔
1155
    code = TSDB_CODE_ACTION_IN_PROGRESS;
191,994✔
1156
    tsGrantHBInterval = 5;
191,994✔
1157
  }
1158

1159
  char obj[200] = {0};
191,994✔
1160
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
191,994✔
1161

1162
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
191,994✔
1163

1164
_OVER:
192,353✔
1165
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
192,353✔
1166
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
359✔
1167
  }
1168

1169
  mndReleaseDnode(pMnode, pDnode);
192,353✔
1170
  tFreeSCreateDnodeReq(&createReq);
192,353✔
1171
  TAOS_RETURN(code);
192,353✔
1172
}
1173

1174
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1175

1176
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
4,400✔
1177

1178
#ifndef TD_ENTERPRISE
1179
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1180
#endif
1181

1182
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
13,026✔
1183
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1184
  int32_t  code = -1;
13,026✔
1185
  SSdbRaw *pRaw = NULL;
13,026✔
1186
  STrans  *pTrans = NULL;
13,026✔
1187
  int32_t  lino = 0;
13,026✔
1188

1189
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
13,026✔
1190
  if (pTrans == NULL) {
13,026✔
1191
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1192
    if (terrno != 0) code = terrno;
×
1193
    goto _OVER;
×
1194
  }
1195
  mndTransSetGroupParallel(pTrans);
13,026✔
1196
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
13,026✔
1197
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
13,026✔
1198

1199
  pRaw = mndDnodeActionEncode(pDnode);
13,026✔
1200
  if (pRaw == NULL) {
13,026✔
1201
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1202
    if (terrno != 0) code = terrno;
×
1203
    goto _OVER;
×
1204
  }
1205
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
13,026✔
1206
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
13,026✔
1207
  pRaw = NULL;
13,026✔
1208

1209
  pRaw = mndDnodeActionEncode(pDnode);
13,026✔
1210
  if (pRaw == NULL) {
13,026✔
1211
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1212
    if (terrno != 0) code = terrno;
×
1213
    goto _OVER;
×
1214
  }
1215
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
13,026✔
1216
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
13,026✔
1217
  pRaw = NULL;
13,026✔
1218

1219
  if (pSObj != NULL) {
13,026✔
1220
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1,147✔
1221
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
1,147✔
1222
  }
1223

1224
  if (pMObj != NULL) {
13,026✔
1225
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
645✔
1226
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
645✔
1227
  }
1228

1229
  if (pQObj != NULL) {
13,026✔
1230
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
355✔
1231
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
355✔
1232
  }
1233

1234
  if (pBObj != NULL) {
13,026✔
1235
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1,325✔
1236
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
1,325✔
1237
  }
1238

1239
  if (numOfVnodes > 0) {
11,701✔
1240
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
8,417✔
1241
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
8,417✔
1242
  }
1243

1244
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
11,701✔
1245

1246
  code = 0;
11,701✔
1247

1248
_OVER:
13,026✔
1249
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
13,026✔
1250
  mndTransDrop(pTrans);
13,026✔
1251
  sdbFreeRaw(pRaw);
13,026✔
1252
  TAOS_RETURN(code);
13,026✔
1253
}
1254

1255
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1256
  bool       isEmpty = false;
×
1257
  SMnodeObj *pMObj = NULL;
×
1258
  SQnodeObj *pQObj = NULL;
×
1259
  SSnodeObj *pSObj = NULL;
×
1260

1261
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1262
  if (pQObj) goto _OVER;
×
1263

1264
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1265
  if (pSObj) goto _OVER;
×
1266

1267
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1268
  if (pMObj) goto _OVER;
×
1269

1270
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1271
  if (numOfVnodes > 0) goto _OVER;
×
1272

1273
  isEmpty = true;
×
1274
_OVER:
×
1275
  mndReleaseMnode(pMnode, pMObj);
×
1276
  mndReleaseQnode(pMnode, pQObj);
×
1277
  mndReleaseSnode(pMnode, pSObj);
×
1278
  return isEmpty;
×
1279
}
1280

1281
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
15,045✔
1282
  SMnode       *pMnode = pReq->info.node;
15,045✔
1283
  int32_t       code = -1;
15,045✔
1284
  SDnodeObj    *pDnode = NULL;
15,045✔
1285
  SMnodeObj    *pMObj = NULL;
15,045✔
1286
  SQnodeObj    *pQObj = NULL;
15,045✔
1287
  SSnodeObj    *pSObj = NULL;
15,045✔
1288
  SBnodeObj    *pBObj = NULL;
15,045✔
1289
  SDropDnodeReq dropReq = {0};
15,045✔
1290

1291
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
15,045✔
1292

1293
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
15,045✔
1294
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1295
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
15,045✔
1296

1297
  bool force = dropReq.force;
14,686✔
1298
  if (dropReq.unsafe) {
14,686✔
1299
    force = true;
×
1300
  }
1301

1302
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
14,686✔
1303
  if (pDnode == NULL) {
14,686✔
1304
    int32_t err = terrno;
×
1305
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1306
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1307
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1308
    if (pDnode == NULL) {
×
1309
      code = err;
×
1310
      goto _OVER;
×
1311
    }
1312
  }
1313

1314
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
14,686✔
1315
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
14,686✔
1316
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
14,686✔
1317
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
14,686✔
1318
  if (pMObj != NULL) {
14,686✔
1319
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
2,305✔
1320
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
1,015✔
1321
      goto _OVER;
1,015✔
1322
    }
1323
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
1,290✔
1324
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
645✔
1325
      goto _OVER;
645✔
1326
    }
1327
  }
1328

1329
#ifdef USE_MOUNT
1330
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
13,026✔
1331
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1332
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1333
    goto _OVER;
×
1334
  }
1335
#endif
1336

1337
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
13,026✔
1338
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
13,026✔
1339

1340
  if (isonline && force) {
13,026✔
1341
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1342
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1343
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1344
    goto _OVER;
×
1345
  }
1346

1347
  mError("vnode num:%d", numOfVnodes);
13,026✔
1348

1349
  bool    vnodeOffline = false;
13,026✔
1350
  void   *pIter = NULL;
13,026✔
1351
  int32_t vgId = -1;
13,026✔
1352
  while (1) {
24,406✔
1353
    SVgObj *pVgroup = NULL;
37,432✔
1354
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
37,432✔
1355
    if (pIter == NULL) break;
37,432✔
1356

1357
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
74,272✔
1358
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
49,866✔
1359
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
49,866✔
1360
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
17,075✔
1361
          vgId = pVgroup->vgId;
×
1362
          vnodeOffline = true;
×
1363
          break;
×
1364
        }
1365
      }
1366
    }
1367

1368
    sdbRelease(pMnode->pSdb, pVgroup);
24,406✔
1369

1370
    if (vnodeOffline) {
24,406✔
1371
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1372
      break;
×
1373
    }
1374
  }
1375

1376
  if (vnodeOffline && !force) {
13,026✔
1377
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1378
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1379
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1380
    goto _OVER;
×
1381
  }
1382

1383
  if (!isonline && !force) {
13,026✔
1384
    code = TSDB_CODE_DNODE_OFFLINE;
×
1385
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
×
1386
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1387
    goto _OVER;
×
1388
  }
1389

1390
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
13,026✔
1391
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
13,026✔
1392

1393
  char obj1[30] = {0};
13,026✔
1394
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
13,026✔
1395

1396
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
13,026✔
1397

1398
_OVER:
15,045✔
1399
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
15,045✔
1400
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
3,344✔
1401
  }
1402

1403
  mndReleaseDnode(pMnode, pDnode);
15,045✔
1404
  mndReleaseMnode(pMnode, pMObj);
15,045✔
1405
  mndReleaseQnode(pMnode, pQObj);
15,045✔
1406
  mndReleaseBnode(pMnode, pBObj);
15,045✔
1407
  mndReleaseSnode(pMnode, pSObj);
15,045✔
1408
  tFreeSDropDnodeReq(&dropReq);
15,045✔
1409
  TAOS_RETURN(code);
15,045✔
1410
}
1411

1412
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1,255✔
1413
  int32_t code = 0;
1,255✔
1414
  SMnode *pMnode = pReq->info.node;
1,255✔
1415
  SSdb   *pSdb = pMnode->pSdb;
1,255✔
1416
  void   *pIter = NULL;
1,255✔
1417
  int8_t  encrypting = 0;
1,255✔
1418

1419
  const STraceId *trace = &pReq->info.traceId;
1,255✔
1420

1421
  int32_t klen = strlen(pDcfgReq->value);
1,255✔
1422
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1,255✔
1423
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1424
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1425
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1426
    goto _exit;
×
1427
  }
1428

1429
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1,255✔
1430
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1431
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1432
    goto _exit;
×
1433
  }
1434

1435
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1,255✔
1436
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1437
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1438
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1439
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1440
    goto _exit;
×
1441
  }
1442

1443
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1,255✔
1444
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1,255✔
1445
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1,255✔
1446

1447
  while (1) {
2,785✔
1448
    SDnodeObj *pDnode = NULL;
4,040✔
1449
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
4,040✔
1450
    if (pIter == NULL) break;
4,040✔
1451
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
2,785✔
1452
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1453
             offlineReason[pDnode->offlineReason]);
1454
      sdbRelease(pSdb, pDnode);
×
1455
      continue;
×
1456
    }
1457

1458
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
2,785✔
1459
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
2,785✔
1460
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
2,785✔
1461
      void   *pBuf = rpcMallocCont(bufLen);
2,785✔
1462

1463
      if (pBuf != NULL) {
2,785✔
1464
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
2,785✔
1465
          code = bufLen;
×
1466
          sdbRelease(pSdb, pDnode);
×
1467
          goto _exit;
×
1468
        }
1469
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
2,785✔
1470
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
2,785✔
1471
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
2,785✔
1472
        }
1473
      }
1474
    }
1475

1476
    sdbRelease(pSdb, pDnode);
2,785✔
1477
  }
1478

1479
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1,255✔
1480
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1481
  }
1482

1483
_exit:
1,255✔
1484
  if (code != 0) {
1,255✔
1485
    if (terrno == 0) terrno = code;
×
1486
  }
1487
  return code;
1,255✔
1488
}
1489

1490
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1,255✔
1491
  int32_t code = 0;
1,255✔
1492

1493
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1494
  SMnode       *pMnode = pReq->info.node;
1,255✔
1495
  SMCfgDnodeReq cfgReq = {0};
1,255✔
1496
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1,255✔
1497

1498
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1,255✔
1499
    tFreeSMCfgDnodeReq(&cfgReq);
×
1500
    TAOS_RETURN(code);
×
1501
  }
1502
  const STraceId *trace = &pReq->info.traceId;
1,255✔
1503
  SDCfgDnodeReq   dcfgReq = {0};
1,255✔
1504
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1,255✔
1505
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
1,255✔
1506
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
1,255✔
1507
    tFreeSMCfgDnodeReq(&cfgReq);
1,255✔
1508
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1,255✔
1509
  } else {
1510
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1511
    tFreeSMCfgDnodeReq(&cfgReq);
×
1512
    TAOS_RETURN(code);
×
1513
  }
1514

1515
#else
1516
  TAOS_RETURN(code);
1517
#endif
1518
}
1519

1520
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
2,785✔
1521
  SMnode *pMnode = pRsp->info.node;
2,785✔
1522
  int16_t nSuccess = 0;
2,785✔
1523
  int16_t nFailed = 0;
2,785✔
1524

1525
  if (0 == pRsp->code) {
2,785✔
1526
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
2,785✔
1527
  } else {
1528
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1529
  }
1530

1531
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
2,785✔
1532
  bool    finished = nSuccess + nFailed >= nReq;
2,785✔
1533

1534
  if (finished) {
2,785✔
1535
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1,255✔
1536
  }
1537

1538
  const STraceId *trace = &pRsp->info.traceId;
2,785✔
1539
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
2,785✔
1540
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1541

1542
  return 0;
2,785✔
1543
}
1544

1545
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
2,513✔
1546
  SMnode *pMnode = pReq->info.node;
2,513✔
1547
  int32_t totalRows = 0;
2,513✔
1548
  int32_t numOfRows = 0;
2,513✔
1549
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
2,513✔
1550
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
2,513✔
1551
  char   *pWrite = NULL;
2,513✔
1552
  int32_t cols = 0;
2,513✔
1553
  int32_t code = 0;
2,513✔
1554
  int32_t lino = 0;
2,513✔
1555

1556
  cfgOpts[totalRows] = "statusIntervalMs";
2,513✔
1557
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
2,513✔
1558
  totalRows++;
2,513✔
1559

1560
  cfgOpts[totalRows] = "timezone";
2,513✔
1561
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
2,513✔
1562
  totalRows++;
2,513✔
1563

1564
  cfgOpts[totalRows] = "locale";
2,513✔
1565
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
2,513✔
1566
  totalRows++;
2,513✔
1567

1568
  cfgOpts[totalRows] = "charset";
2,513✔
1569
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
2,513✔
1570
  totalRows++;
2,513✔
1571

1572
  cfgOpts[totalRows] = "monitor";
2,513✔
1573
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
2,513✔
1574
  totalRows++;
2,513✔
1575

1576
  cfgOpts[totalRows] = "monitorInterval";
2,513✔
1577
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
2,513✔
1578
  totalRows++;
2,513✔
1579

1580
  cfgOpts[totalRows] = "slowLogThreshold";
2,513✔
1581
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
2,513✔
1582
  totalRows++;
2,513✔
1583

1584
  cfgOpts[totalRows] = "slowLogMaxLen";
2,513✔
1585
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
2,513✔
1586
  totalRows++;
2,513✔
1587

1588
  char scopeStr[64] = {0};
2,513✔
1589
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
2,513✔
1590
  cfgOpts[totalRows] = "slowLogScope";
2,513✔
1591
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
2,513✔
1592
  totalRows++;
2,513✔
1593

1594
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
2,513✔
1595
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
2,513✔
1596

1597
  for (int32_t i = 0; i < totalRows; i++) {
25,130✔
1598
    cols = 0;
22,617✔
1599

1600
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
22,617✔
1601
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,617✔
1602
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
22,617✔
1603

1604
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
22,617✔
1605
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,617✔
1606
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
22,617✔
1607

1608
    numOfRows++;
22,617✔
1609
  }
1610

1611
_OVER:
2,513✔
1612
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
2,513✔
1613
  pShow->numOfRows += numOfRows;
2,513✔
1614
  return numOfRows;
2,513✔
1615
}
1616

1617
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1618

1619
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
894,971✔
1620
  SMnode    *pMnode = pReq->info.node;
894,971✔
1621
  SSdb      *pSdb = pMnode->pSdb;
894,971✔
1622
  int32_t    numOfRows = 0;
894,971✔
1623
  int32_t    cols = 0;
894,971✔
1624
  ESdbStatus objStatus = 0;
894,971✔
1625
  SDnodeObj *pDnode = NULL;
894,971✔
1626
  int64_t    curMs = taosGetTimestampMs();
894,971✔
1627
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
893,038✔
1628
  int32_t    code = 0;
894,971✔
1629
  int32_t    lino = 0;
894,971✔
1630

1631
  while (numOfRows < rows) {
3,330,731✔
1632
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
3,330,731✔
1633
    if (pShow->pIter == NULL) break;
3,330,731✔
1634
    bool online = mndIsDnodeOnline(pDnode, curMs);
2,435,760✔
1635

1636
    cols = 0;
2,435,760✔
1637

1638
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,435,760✔
1639
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
2,435,760✔
1640

1641
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
2,435,760✔
1642

1643
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,435,760✔
1644
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,435,760✔
1645

1646
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,435,760✔
1647
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
2,435,760✔
1648
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
2,435,760✔
1649

1650
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,435,760✔
1651
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
2,435,760✔
1652
                        &lino, _OVER);
1653

1654
    const char *status = "ready";
2,435,760✔
1655
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
2,435,760✔
1656
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
2,435,760✔
1657
    if (!online) {
2,435,760✔
1658
      if (objStatus == SDB_STATUS_CREATING)
258,900✔
1659
        status = "creating*";
×
1660
      else if (objStatus == SDB_STATUS_DROPPING)
258,900✔
1661
        status = "dropping*";
×
1662
      else
1663
        status = "offline";
258,900✔
1664
    }
1665

1666
    STR_TO_VARSTR(buf, status);
2,435,760✔
1667
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,435,760✔
1668
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,435,760✔
1669

1670
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,435,760✔
1671
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
2,435,760✔
1672
                        _OVER);
1673

1674
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,435,760✔
1675
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
2,435,760✔
1676
                        _OVER);
1677

1678
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
2,435,760✔
1679
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
2,435,760✔
1680

1681
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,435,760✔
1682
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
2,435,760✔
1683
    taosMemoryFreeClear(b);
2,435,760✔
1684

1685
#ifdef TD_ENTERPRISE
1686
    STR_TO_VARSTR(buf, pDnode->machineId);
2,435,760✔
1687
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,435,760✔
1688
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,435,760✔
1689
#endif
1690

1691
    numOfRows++;
2,435,760✔
1692
    sdbRelease(pSdb, pDnode);
2,435,760✔
1693
  }
1694

1695
_OVER:
893,038✔
1696
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
894,971✔
1697

1698
  pShow->numOfRows += numOfRows;
894,971✔
1699
  return numOfRows;
894,971✔
1700
}
1701

1702
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1703
  SSdb *pSdb = pMnode->pSdb;
×
1704
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1705
}
×
1706

1707
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1708
  SDnodeObj *pObj = NULL;
×
1709
  void      *pIter = NULL;
×
1710
  SSdb      *pSdb = pMnode->pSdb;
×
1711
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1712
  while (1) {
×
1713
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1714
    if (pIter == NULL) break;
×
1715

1716
    char *fqdn = taosStrdup(pObj->fqdn);
×
1717
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1718
      mError("failed to fqdn into array, but continue at this time");
×
1719
    }
1720
    sdbRelease(pSdb, pObj);
×
1721
  }
1722
  return fqdns;
×
1723
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc