• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4886

16 Dec 2025 01:13AM UTC coverage: 65.292% (+0.03%) from 65.258%
#4886

push

travis-ci

web-flow
fix: compile error (#33938)

178718 of 273721 relevant lines covered (65.29%)

103311111.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.63
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "taos_monitor.h"
33
#include "tconfig.h"
34
#include "tjson.h"
35
#include "tmisce.h"
36
#include "tunit.h"
37

38
#define TSDB_DNODE_VER_NUMBER   2
39
#define TSDB_DNODE_RESERVE_SIZE 40
40

41
static const char *offlineReason[] = {
42
    "",
43
    "status msg timeout",
44
    "status not received",
45
    "version not match",
46
    "dnodeId not match",
47
    "clusterId not match",
48
    "statusInterval not match",
49
    "timezone not match",
50
    "locale not match",
51
    "charset not match",
52
    "ttlChangeOnWrite not match",
53
    "enableWhiteList not match",
54
    "encryptionKey not match",
55
    "monitor not match",
56
    "monitor switch not match",
57
    "monitor interval not match",
58
    "monitor slow log threshold not match",
59
    "monitor slow log sql max len not match",
60
    "monitor slow log scope not match",
61
    "unknown",
62
};
63

64
enum {
65
  DND_ACTIVE_CODE,
66
  DND_CONN_ACTIVE_CODE,
67
};
68

69
enum {
70
  DND_CREATE,
71
  DND_ADD,
72
  DND_DROP,
73
};
74

75
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
76
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
77
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
78
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
79
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
80
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
81
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
82

83
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
91
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
92
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
93

94
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
96
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
98

99
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
100
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
101

102
#ifdef _GRANT
103
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
104
#else
105
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
106
#endif
107

108
int32_t mndInitDnode(SMnode *pMnode) {
499,374✔
109
  SSdbTable table = {
499,374✔
110
      .sdbType = SDB_DNODE,
111
      .keyType = SDB_KEY_INT32,
112
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
113
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
114
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
115
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
116
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
117
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
118
  };
119

120
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
499,374✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
499,374✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
499,374✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
499,374✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
499,374✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
499,374✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
499,374✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
499,374✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
499,374✔
129
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
499,374✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
499,374✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
499,374✔
132
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
499,374✔
133

134
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
499,374✔
135
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
499,374✔
136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
499,374✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
499,374✔
138

139
  return sdbSetTable(pMnode->pSdb, table);
499,374✔
140
}
141

142
void mndCleanupDnode(SMnode *pMnode) {}
499,257✔
143

144
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
323,185✔
145
  int32_t  code = -1;
323,185✔
146
  SSdbRaw *pRaw = NULL;
323,185✔
147
  STrans  *pTrans = NULL;
323,185✔
148

149
  SDnodeObj dnodeObj = {0};
323,185✔
150
  dnodeObj.id = 1;
323,185✔
151
  dnodeObj.createdTime = taosGetTimestampMs();
323,185✔
152
  dnodeObj.updateTime = dnodeObj.createdTime;
323,185✔
153
  dnodeObj.port = tsServerPort;
323,185✔
154
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
323,185✔
155
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
323,185✔
156
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
323,185✔
157
  char *machineId = NULL;
323,185✔
158
  code = tGetMachineId(&machineId);
323,185✔
159
  if (machineId) {
323,185✔
160
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
323,185✔
161
    taosMemoryFreeClear(machineId);
323,185✔
162
  } else {
163
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
164
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
165
    goto _OVER;
×
166
#endif
167
  }
168

169
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
323,185✔
170
  if (pTrans == NULL) {
323,185✔
171
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
172
    if (terrno != 0) code = terrno;
×
173
    goto _OVER;
×
174
  }
175
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
323,185✔
176

177
  pRaw = mndDnodeActionEncode(&dnodeObj);
323,185✔
178
  if (pRaw == NULL) {
323,185✔
179
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
180
    if (terrno != 0) code = terrno;
×
181
    goto _OVER;
×
182
  }
183
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
323,185✔
184
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
323,185✔
185
  pRaw = NULL;
323,185✔
186

187
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
323,185✔
188
  code = 0;
323,185✔
189

190
_OVER:
323,185✔
191
  mndTransDrop(pTrans);
323,185✔
192
  sdbFreeRaw(pRaw);
323,185✔
193
  return code;
323,185✔
194
}
195

196
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,945,499✔
197
  int32_t code = 0;
2,945,499✔
198
  int32_t lino = 0;
2,945,499✔
199
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,945,499✔
200

201
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,945,499✔
202
  if (pRaw == NULL) goto _OVER;
2,945,499✔
203

204
  int32_t dataPos = 0;
2,945,499✔
205
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,945,499✔
206
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,945,499✔
207
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,945,499✔
208
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,945,499✔
209
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,945,499✔
210
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,945,499✔
211
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,945,499✔
212
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,945,499✔
213
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,945,499✔
214
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,945,499✔
215

216
  terrno = 0;
2,945,499✔
217

218
_OVER:
2,945,499✔
219
  if (terrno != 0) {
2,945,499✔
220
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
221
    sdbFreeRaw(pRaw);
×
222
    return NULL;
×
223
  }
224

225
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,945,499✔
226
  return pRaw;
2,945,499✔
227
}
228

229
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
2,237,629✔
230
  int32_t code = 0;
2,237,629✔
231
  int32_t lino = 0;
2,237,629✔
232
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,237,629✔
233
  SSdbRow   *pRow = NULL;
2,237,629✔
234
  SDnodeObj *pDnode = NULL;
2,237,629✔
235

236
  int8_t sver = 0;
2,237,629✔
237
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
2,237,629✔
238
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
2,237,629✔
239
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
240
    goto _OVER;
×
241
  }
242

243
  pRow = sdbAllocRow(sizeof(SDnodeObj));
2,237,629✔
244
  if (pRow == NULL) goto _OVER;
2,237,629✔
245

246
  pDnode = sdbGetRowObj(pRow);
2,237,629✔
247
  if (pDnode == NULL) goto _OVER;
2,237,629✔
248

249
  int32_t dataPos = 0;
2,237,629✔
250
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
2,237,629✔
251
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
2,237,629✔
252
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
2,237,629✔
253
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
2,237,629✔
254
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,237,629✔
255
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,237,629✔
256
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,237,629✔
257
  if (sver > 1) {
2,237,629✔
258
    int16_t keyLen = 0;
2,237,629✔
259
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,237,629✔
260
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,237,629✔
261
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,237,629✔
262
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,237,629✔
263
  }
264

265
  terrno = 0;
2,237,629✔
266
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
2,237,629✔
267
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
268
  }
269

270
_OVER:
2,237,629✔
271
  if (terrno != 0) {
2,237,629✔
272
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
273
    taosMemoryFreeClear(pRow);
×
274
    return NULL;
×
275
  }
276

277
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
2,237,629✔
278
  return pRow;
2,237,629✔
279
}
280

281
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
1,099,168✔
282
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
1,099,168✔
283
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
1,099,168✔
284

285
  char ep[TSDB_EP_LEN] = {0};
1,099,168✔
286
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
1,099,168✔
287
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
1,099,168✔
288
  return 0;
1,099,168✔
289
}
290

291
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
2,237,546✔
292
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
2,237,546✔
293
  return 0;
2,237,546✔
294
}
295

296
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
1,125,545✔
297
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
1,125,545✔
298
  pOld->updateTime = pNew->updateTime;
1,125,545✔
299
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
300
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
1,125,545✔
301
#endif
302
  return 0;
1,125,545✔
303
}
304

305
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
154,280,038✔
306
  SSdb      *pSdb = pMnode->pSdb;
154,280,038✔
307
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
154,280,038✔
308
  if (pDnode == NULL) {
154,280,038✔
309
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
333,355✔
310
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
103,315✔
311
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
230,040✔
312
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
313
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
230,040✔
314
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
230,040✔
315
    } else {
316
      terrno = TSDB_CODE_APP_ERROR;
×
317
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
318
    }
319
  }
320

321
  return pDnode;
154,280,038✔
322
}
323

324
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
155,904,321✔
325
  SSdb *pSdb = pMnode->pSdb;
155,904,321✔
326
  sdbRelease(pSdb, pDnode);
155,904,321✔
327
}
155,904,321✔
328

329
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
10,144,151✔
330
  SEpSet epSet = {0};
10,144,151✔
331
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
10,144,151✔
332
  return epSet;
10,144,151✔
333
}
334

335
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
1,171,138✔
336
  SEpSet     epSet = {0};
1,171,138✔
337
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,171,138✔
338
  if (!pDnode) return epSet;
1,171,138✔
339

340
  epSet = mndGetDnodeEpset(pDnode);
1,171,138✔
341

342
  mndReleaseDnode(pMnode, pDnode);
1,171,138✔
343
  return epSet;
1,171,138✔
344
}
345

346
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,706,675✔
347
  SSdb *pSdb = pMnode->pSdb;
1,706,675✔
348

349
  void *pIter = NULL;
1,706,675✔
350
  while (1) {
3,330,035✔
351
    SDnodeObj *pDnode = NULL;
5,036,710✔
352
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
5,036,710✔
353
    if (pIter == NULL) break;
5,036,710✔
354

355
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
4,047,169✔
356
      sdbCancelFetch(pSdb, pIter);
717,134✔
357
      return pDnode;
717,134✔
358
    }
359

360
    sdbRelease(pSdb, pDnode);
3,330,035✔
361
  }
362

363
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
989,541✔
364
  return NULL;
989,541✔
365
}
366

367
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
173,400✔
368
  SSdb *pSdb = pMnode->pSdb;
173,400✔
369

370
  void *pIter = NULL;
173,400✔
371
  while (1) {
188,330✔
372
    SDnodeObj *pDnode = NULL;
361,730✔
373
    ESdbStatus objStatus = 0;
361,730✔
374
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
361,730✔
375
    if (pIter == NULL) break;
361,730✔
376

377
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
361,730✔
378
      sdbCancelFetch(pSdb, pIter);
173,400✔
379
      return pDnode;
173,400✔
380
    }
381

382
    sdbRelease(pSdb, pDnode);
188,330✔
383
  }
384

385
  return NULL;
×
386
}
387

388
int32_t mndGetDnodeSize(SMnode *pMnode) {
69,494,576✔
389
  SSdb *pSdb = pMnode->pSdb;
69,494,576✔
390
  return sdbGetSize(pSdb, SDB_DNODE);
69,495,088✔
391
}
392

393
int32_t mndGetDbSize(SMnode *pMnode) {
×
394
  SSdb *pSdb = pMnode->pSdb;
×
395
  return sdbGetSize(pSdb, SDB_DB);
×
396
}
397

398
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
89,231,846✔
399
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
89,231,846✔
400
  if (interval > (int64_t)tsStatusTimeoutMs) {
89,231,924✔
401
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,851,228✔
402
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
64,858✔
403
    }
404
    return false;
2,850,909✔
405
  }
406
  return true;
86,380,696✔
407
}
408

409
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,735,589✔
410
  SSdb *pSdb = pMnode->pSdb;
2,735,589✔
411

412
  int32_t numOfEps = 0;
2,735,589✔
413
  void   *pIter = NULL;
2,735,589✔
414
  while (1) {
10,215,291✔
415
    SDnodeObj *pDnode = NULL;
12,950,880✔
416
    ESdbStatus objStatus = 0;
12,950,880✔
417
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
12,950,880✔
418
    if (pIter == NULL) break;
12,950,880✔
419

420
    SDnodeEp dnodeEp = {0};
10,215,291✔
421
    dnodeEp.id = pDnode->id;
10,215,291✔
422
    dnodeEp.ep.port = pDnode->port;
10,215,291✔
423
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
10,215,291✔
424
    sdbRelease(pSdb, pDnode);
10,215,291✔
425

426
    dnodeEp.isMnode = 0;
10,215,291✔
427
    if (mndIsMnode(pMnode, pDnode->id)) {
10,215,291✔
428
      dnodeEp.isMnode = 1;
4,054,345✔
429
    }
430
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
10,215,291✔
431
      mError("failed to put ep into array, but continue at this call");
×
432
    }
433
  }
434
}
2,735,589✔
435

436
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
26,933,114✔
437
  SSdb   *pSdb = pMnode->pSdb;
26,933,114✔
438
  int32_t code = 0;
26,933,114✔
439

440
  int32_t numOfEps = 0;
26,933,114✔
441
  void   *pIter = NULL;
26,933,114✔
442
  while (1) {
120,461,114✔
443
    SDnodeObj *pDnode = NULL;
147,394,228✔
444
    ESdbStatus objStatus = 0;
147,394,228✔
445
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
147,394,228✔
446
    if (pIter == NULL) break;
147,394,228✔
447

448
    SDnodeInfo dInfo;
120,460,733✔
449
    dInfo.id = pDnode->id;
120,461,114✔
450
    dInfo.ep.port = pDnode->port;
120,461,114✔
451
    dInfo.offlineReason = pDnode->offlineReason;
120,461,114✔
452
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
120,461,114✔
453
    sdbRelease(pSdb, pDnode);
120,461,114✔
454
    if (mndIsMnode(pMnode, pDnode->id)) {
120,461,114✔
455
      dInfo.isMnode = 1;
37,072,044✔
456
    } else {
457
      dInfo.isMnode = 0;
83,389,070✔
458
    }
459

460
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
120,461,114✔
461
      code = terrno;
×
462
      sdbCancelFetch(pSdb, pIter);
×
463
      break;
×
464
    }
465
  }
466
  TAOS_RETURN(code);
26,933,114✔
467
}
468

469
#define CHECK_MONITOR_PARA(para, err)                                                                    \
470
  if (pCfg->monitorParas.para != para) {                                                                 \
471
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
472
    terrno = err;                                                                                        \
473
    return err;                                                                                          \
474
  }
475

476
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
2,743,233✔
477
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
2,743,233✔
478
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
2,743,233✔
479
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
2,743,233✔
480
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
2,743,233✔
481
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
2,743,233✔
482

483
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
2,743,233✔
484
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
485
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
486
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
487
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
488
  }
489

490
  /*
491
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
492
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
493
           tsStatusIntervalMs);
494
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
495
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
496
  }
497
  */
498

499
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
2,743,233✔
500
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
7,644✔
501
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
502
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
7,644✔
503
    return DND_REASON_TIME_ZONE_NOT_MATCH;
7,644✔
504
  }
505

506
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
2,735,589✔
507
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
508
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
509
    return DND_REASON_LOCALE_NOT_MATCH;
×
510
  }
511

512
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
2,735,589✔
513
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
514
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
515
    return DND_REASON_CHARSET_NOT_MATCH;
×
516
  }
517

518
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
2,735,589✔
519
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
520
           tsTtlChangeOnWrite);
521
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
522
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
523
  }
524
  int8_t enable = tsEnableWhiteList ? 1 : 0;
2,735,589✔
525
  if (pCfg->enableWhiteList != enable) {
2,735,589✔
526
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
527
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
528
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
529
  }
530

531
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
2,735,589✔
532
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
2,735,589✔
533
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
534
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
535
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
536
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
537
  }
538

539
  return DND_REASON_ONLINE;
2,735,589✔
540
}
541

542
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
81,816✔
543
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
81,816✔
544
    return 0.0;
897✔
545
  }
546

547
  int64_t deltaCount = currentCount - lastCount;
80,919✔
548
  int64_t deltaMs = currentTimeMs - lastTimeMs;
80,919✔
549
  double  rate = (double)deltaCount / (double)deltaMs;
80,919✔
550
  return rate;
80,919✔
551
}
552

553
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
100,307,567✔
554
  bool stateChanged = false;
100,307,567✔
555
  bool roleChanged = pGid->syncState != pVload->syncState ||
100,320,431✔
556
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
195,586,751✔
557
                     pGid->roleTimeMs != pVload->roleTimeMs;
95,279,184✔
558

559
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
100,307,567✔
560
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
162,346✔
561
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
68,584✔
562
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
93,762✔
563
      int64_t currentTimeMs = taosGetTimestampMs();
81,816✔
564
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
81,816✔
565
                                          pGid->lastSyncAppliedIndexUpdateTime);
566

567
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
81,816✔
568
    }
569
  }
570

571
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
100,307,567✔
572
  pGid->syncCommitIndex = pVload->syncCommitIndex;
100,307,567✔
573
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
100,307,567✔
574
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
100,307,567✔
575
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
100,307,567✔
576
      pGid->startTimeMs != pVload->startTimeMs) {
94,745,371✔
577
    mInfo(
5,562,196✔
578
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
579
        "canRead:%d, dnode:%d",
580
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
581
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
582
    pGid->syncState = pVload->syncState;
5,562,196✔
583
    pGid->syncTerm = pVload->syncTerm;
5,562,196✔
584
    pGid->syncRestore = pVload->syncRestore;
5,562,196✔
585
    pGid->syncCanRead = pVload->syncCanRead;
5,562,196✔
586
    pGid->startTimeMs = pVload->startTimeMs;
5,562,196✔
587
    pGid->roleTimeMs = pVload->roleTimeMs;
5,562,196✔
588
    stateChanged = true;
5,562,196✔
589
  }
590
  return stateChanged;
100,307,567✔
591
}
592

593
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
23,230,692✔
594
  bool stateChanged = false;
23,230,692✔
595
  bool roleChanged = pObj->syncState != pMload->syncState ||
23,237,448✔
596
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
45,913,410✔
597
                     pObj->roleTimeMs != pMload->roleTimeMs;
22,682,718✔
598
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
23,230,692✔
599
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
566,143✔
600
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
601
          pObj->syncTerm, pMload->syncTerm);
602
    pObj->syncState = pMload->syncState;
566,143✔
603
    pObj->syncTerm = pMload->syncTerm;
566,143✔
604
    pObj->syncRestore = pMload->syncRestore;
566,143✔
605
    pObj->roleTimeMs = pMload->roleTimeMs;
566,143✔
606
    stateChanged = true;
566,143✔
607
  }
608
  return stateChanged;
23,230,692✔
609
}
610

611
extern char   *tsMonFwUri;
612
extern char   *tsMonSlowLogUri;
613
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
131✔
614
  SMnode    *pMnode = pReq->info.node;
131✔
615
  SStatisReq statisReq = {0};
131✔
616
  int32_t    code = -1;
131✔
617

618
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
131✔
619

620
  if (tsMonitorLogProtocol) {
131✔
621
    mInfo("process statis req,\n %s", statisReq.pCont);
131✔
622
  }
623

624
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
131✔
625
    monSendContent(statisReq.pCont, tsMonFwUri);
131✔
626
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
627
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
628
  }
629

630
  tFreeSStatisReq(&statisReq);
131✔
631
  return 0;
131✔
632
}
633

634
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
1,526,057✔
635
  mTrace("process audit req:%p", pReq);
1,526,057✔
636
  if (tsEnableAudit && tsEnableAuditDelete) {
1,526,057✔
637
    SMnode   *pMnode = pReq->info.node;
1,526,057✔
638
    SAuditReq auditReq = {0};
1,526,057✔
639

640
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
1,526,057✔
641

642
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
1,526,057✔
643

644
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
1,526,057✔
645
                   auditReq.sqlLen);
646

647
    tFreeSAuditReq(&auditReq);
1,526,057✔
648
  }
649
  return 0;
1,526,057✔
650
}
651

652
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
797,331✔
653
  int32_t       code = 0, lino = 0;
797,331✔
654
  SDnodeInfoReq infoReq = {0};
797,331✔
655
  int32_t       contLen = 0;
797,331✔
656
  void         *pReq = NULL;
797,331✔
657

658
  infoReq.dnodeId = pDnode->id;
797,331✔
659
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
797,331✔
660

661
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
797,331✔
662
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
663
  }
664
  pReq = rpcMallocCont(contLen);
797,331✔
665
  if (pReq == NULL) {
797,331✔
666
    TAOS_RETURN(terrno);
×
667
  }
668

669
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
797,331✔
670
    code = contLen;
×
671
    goto _exit;
×
672
  }
673

674
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
797,331✔
675
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
797,331✔
676
_exit:
797,331✔
677
  if (code < 0) {
797,331✔
678
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
679
  }
680
  TAOS_RETURN(code);
797,331✔
681
}
682

683
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
797,331✔
684
  int32_t       code = 0, lino = 0;
797,331✔
685
  SMnode       *pMnode = pReq->info.node;
797,331✔
686
  SDnodeInfoReq infoReq = {0};
797,331✔
687
  SDnodeObj    *pDnode = NULL;
797,331✔
688
  STrans       *pTrans = NULL;
797,331✔
689
  SSdbRaw      *pCommitRaw = NULL;
797,331✔
690

691
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
797,331✔
692

693
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
797,331✔
694
  if (pDnode == NULL) {
797,331✔
695
    TAOS_CHECK_EXIT(terrno);
×
696
  }
697

698
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
797,331✔
699
  if (pTrans == NULL) {
797,331✔
700
    TAOS_CHECK_EXIT(terrno);
×
701
  }
702

703
  pDnode->updateTime = taosGetTimestampMs();
797,331✔
704

705
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
797,331✔
706
    TAOS_CHECK_EXIT(terrno);
×
707
  }
708
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
797,331✔
709
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
710
    TAOS_CHECK_EXIT(code);
×
711
  }
712
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
797,331✔
713
  pCommitRaw = NULL;
797,331✔
714

715
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
797,331✔
716
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
717
    TAOS_CHECK_EXIT(code);
×
718
  }
719

720
_exit:
797,331✔
721
  mndReleaseDnode(pMnode, pDnode);
797,331✔
722
  if (code != 0) {
797,331✔
723
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
724
  }
725
  mndTransDrop(pTrans);
797,331✔
726
  sdbFreeRaw(pCommitRaw);
797,331✔
727
  TAOS_RETURN(code);
797,331✔
728
}
729

730
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
41,005,036✔
731
  SMnode    *pMnode = pReq->info.node;
41,005,036✔
732
  SStatusReq statusReq = {0};
41,005,036✔
733
  SDnodeObj *pDnode = NULL;
41,005,036✔
734
  int32_t    code = -1;
41,005,036✔
735

736
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
41,005,036✔
737

738
  int64_t clusterid = mndGetClusterId(pMnode);
41,005,036✔
739
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
41,005,036✔
740
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
741
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
742
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
743
    goto _OVER;
×
744
  }
745

746
  if (statusReq.dnodeId == 0) {
41,005,036✔
747
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
1,248,489✔
748
    if (pDnode == NULL) {
1,248,489✔
749
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
532,735✔
750
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
532,735✔
751
      if (terrno != 0) code = terrno;
532,735✔
752
      goto _OVER;
532,735✔
753
    }
754
  } else {
755
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
39,756,547✔
756
    if (pDnode == NULL) {
39,756,547✔
757
      int32_t err = terrno;
262,943✔
758
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
262,943✔
759
      if (pDnode != NULL) {
262,943✔
760
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
1,380✔
761
        terrno = err;
1,380✔
762
        goto _OVER;
1,380✔
763
      }
764

765
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
261,563✔
766
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
261,563✔
767
        terrno = err;
88,163✔
768
        goto _OVER;
88,163✔
769
      } else {
770
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
173,400✔
771
        if (pDnode == NULL) goto _OVER;
173,400✔
772
      }
773
    }
774
  }
775

776
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
40,382,758✔
777
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
40,382,758✔
778

779
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
40,382,758✔
780
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
40,382,758✔
781
  int64_t curMs = taosGetTimestampMs();
40,382,758✔
782
  bool    online = mndIsDnodeOnline(pDnode, curMs);
40,382,758✔
783
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
40,382,758✔
784
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
40,382,758✔
785
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
40,382,758✔
786
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
40,382,758✔
787
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
40,382,758✔
788
  bool    analVerChanged = (analVer != statusReq.analVer);
40,382,758✔
789
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
39,584,393✔
790
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
37,640,117✔
791
                   encryptKeyChanged || enableWhiteListChanged;
79,967,151✔
792
  const STraceId *trace = &pReq->info.traceId;
40,382,758✔
793
  char            timestamp[TD_TIME_STR_LEN] = {0};
40,382,758✔
794
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
40,382,758✔
795
  mGTrace(
40,382,758✔
796
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
797
      "timestamp:%s",
798
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
799

800
  if (reboot) {
40,382,758✔
801
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
834,192✔
802
  }
803

804
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
40,382,758✔
805
  if (labs(delta) >= tsTimestampDeltaLimit) {
40,382,758✔
806
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
807
    code = terrno;
×
808

809
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
810
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
811
           tstrerror(code), tsTimestampDeltaLimit);
812
    goto _OVER;
×
813
  }
814
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
141,772,172✔
815
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
101,389,414✔
816

817
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
101,389,414✔
818
    if (pVgroup != NULL) {
101,389,414✔
819
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
100,369,663✔
820
        pVgroup->cacheUsage = pVload->cacheUsage;
70,014,734✔
821
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
70,014,734✔
822
        pVgroup->numOfTables = pVload->numOfTables;
70,014,734✔
823
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
70,014,734✔
824
        pVgroup->totalStorage = pVload->totalStorage;
70,014,734✔
825
        pVgroup->compStorage = pVload->compStorage;
70,014,734✔
826
        pVgroup->pointsWritten = pVload->pointsWritten;
70,014,734✔
827
      }
828
      bool stateChanged = false;
100,369,663✔
829
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
143,132,235✔
830
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
143,070,139✔
831
        if (pGid->dnodeId == statusReq.dnodeId) {
143,070,139✔
832
          if (pVload->startTimeMs == 0) {
100,307,567✔
833
            pVload->startTimeMs = statusReq.rebootTime;
×
834
          }
835
          if (pVload->roleTimeMs == 0) {
100,307,567✔
836
            pVload->roleTimeMs = statusReq.rebootTime;
×
837
          }
838
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
100,307,567✔
839
          break;
100,307,567✔
840
        }
841
      }
842
      if (stateChanged) {
100,369,663✔
843
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,562,196✔
844
        if (pDb != NULL && pDb->stateTs != curMs) {
5,562,196✔
845
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,825,212✔
846
                pDb->stateTs, curMs);
847
          pDb->stateTs = curMs;
3,825,212✔
848
        }
849
        mndReleaseDb(pMnode, pDb);
5,562,196✔
850
      }
851
    }
852

853
    mndReleaseVgroup(pMnode, pVgroup);
101,389,414✔
854
  }
855

856
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
40,382,758✔
857
  if (pObj != NULL) {
40,382,758✔
858
    if (statusReq.mload.roleTimeMs == 0) {
23,230,692✔
859
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
269,303✔
860
    }
861
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
23,230,692✔
862
    mndReleaseMnode(pMnode, pObj);
23,230,692✔
863
  }
864

865
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
40,382,758✔
866
  if (pQnode != NULL) {
40,382,758✔
867
    pQnode->load = statusReq.qload;
264,203✔
868
    mndReleaseQnode(pMnode, pQnode);
264,203✔
869
  }
870

871
  if (needCheck) {
40,382,758✔
872
    if (statusReq.sver != tsVersion) {
2,743,233✔
873
      if (pDnode != NULL) {
×
874
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
875
      }
876
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
877
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
878
      goto _OVER;
×
879
    }
880

881
    if (statusReq.dnodeId == 0) {
2,743,233✔
882
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
715,754✔
883
    } else {
884
      if (statusReq.clusterId != pMnode->clusterId) {
2,027,479✔
885
        if (pDnode != NULL) {
×
886
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
887
        }
888
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
889
               pMnode->clusterId);
890
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
891
        goto _OVER;
×
892
      }
893
    }
894

895
    // Verify whether the cluster parameters are consistent when status change from offline to ready
896
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
2,743,233✔
897
    if (pDnode->offlineReason != 0) {
2,743,233✔
898
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
7,644✔
899
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
7,644✔
900
      goto _OVER;
7,644✔
901
    }
902

903
    if (!online) {
2,735,589✔
904
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
790,721✔
905
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
906
    } else {
907
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
1,944,868✔
908
            statusReq.dnodeVer, dnodeVer, reboot);
909
    }
910

911
    pDnode->rebootTime = statusReq.rebootTime;
2,735,589✔
912
    pDnode->numOfCores = statusReq.numOfCores;
2,735,589✔
913
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,735,589✔
914
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,735,589✔
915
    pDnode->memAvail = statusReq.memAvail;
2,735,589✔
916
    pDnode->memTotal = statusReq.memTotal;
2,735,589✔
917
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,735,589✔
918
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,735,589✔
919
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,735,589✔
920
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
797,331✔
921
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
797,331✔
922
        goto _OVER;
×
923
      }
924
    }
925

926
    SStatusRsp statusRsp = {0};
2,735,589✔
927
    statusRsp.statusSeq++;
2,735,589✔
928
    statusRsp.analVer = analVer;
2,735,589✔
929
    statusRsp.dnodeVer = dnodeVer;
2,735,589✔
930
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,735,589✔
931
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,735,589✔
932
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,735,589✔
933
    if (statusRsp.pDnodeEps == NULL) {
2,735,589✔
934
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
935
      goto _OVER;
×
936
    }
937

938
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,735,589✔
939
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,735,589✔
940
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,735,589✔
941

942
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,735,589✔
943
    void   *pHead = rpcMallocCont(contLen);
2,735,589✔
944
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,735,589✔
945
    taosArrayDestroy(statusRsp.pDnodeEps);
2,735,589✔
946
    if (contLen < 0) {
2,735,589✔
947
      code = contLen;
×
948
      goto _OVER;
×
949
    }
950

951
    pReq->info.rspLen = contLen;
2,735,589✔
952
    pReq->info.rsp = pHead;
2,735,589✔
953
  }
954

955
  pDnode->accessTimes++;
40,375,114✔
956
  pDnode->lastAccessTime = curMs;
40,375,114✔
957
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
40,375,114✔
958
    pDnode->offlineReason = DND_REASON_ONLINE;
×
959
  }
960
  code = 0;
40,375,114✔
961

962
_OVER:
41,005,036✔
963
  mndReleaseDnode(pMnode, pDnode);
41,005,036✔
964
  taosArrayDestroy(statusReq.pVloads);
41,005,036✔
965
  if (code != 0) {
41,005,036✔
966
    mError("dnode:%d, failed to process status req since %s", statusReq.dnodeId, tstrerror(code));
532,735✔
967
    return code;
532,735✔
968
  }
969

970
  return mndUpdClusterInfo(pReq);
40,472,301✔
971
}
972

973
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
974
  SMnode    *pMnode = pReq->info.node;
×
975
  SNotifyReq notifyReq = {0};
×
976
  int32_t    code = 0;
×
977

978
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
979
    terrno = code;
×
980
    goto _OVER;
×
981
  }
982

983
  int64_t clusterid = mndGetClusterId(pMnode);
×
984
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
985
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
986
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
987
          notifyReq.clusterId, clusterid, tstrerror(code));
988
    goto _OVER;
×
989
  }
990

991
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
992
  for (int32_t v = 0; v < nVgroup; ++v) {
×
993
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
994

995
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
996
    if (pVgroup != NULL) {
×
997
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
998
      mndReleaseVgroup(pMnode, pVgroup);
×
999
    }
1000
  }
1001
  code = mndUpdClusterInfo(pReq);
×
1002
_OVER:
×
1003
  tFreeSNotifyReq(&notifyReq);
×
1004
  return code;
×
1005
}
1006

1007
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
195,243✔
1008
  int32_t  code = -1;
195,243✔
1009
  SSdbRaw *pRaw = NULL;
195,243✔
1010
  STrans  *pTrans = NULL;
195,243✔
1011

1012
  SDnodeObj dnodeObj = {0};
195,243✔
1013
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
195,243✔
1014
  dnodeObj.createdTime = taosGetTimestampMs();
195,243✔
1015
  dnodeObj.updateTime = dnodeObj.createdTime;
195,243✔
1016
  dnodeObj.port = pCreate->port;
195,243✔
1017
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
195,243✔
1018
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
195,243✔
1019

1020
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
195,243✔
1021
  if (pTrans == NULL) {
195,243✔
1022
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1023
    if (terrno != 0) code = terrno;
×
1024
    goto _OVER;
×
1025
  }
1026
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
195,243✔
1027
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
195,243✔
1028

1029
  pRaw = mndDnodeActionEncode(&dnodeObj);
195,243✔
1030
  if (pRaw == NULL) {
195,243✔
1031
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1032
    if (terrno != 0) code = terrno;
×
1033
    goto _OVER;
×
1034
  }
1035
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
195,243✔
1036
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
195,243✔
1037
  pRaw = NULL;
195,243✔
1038

1039
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
195,243✔
1040
  code = 0;
195,243✔
1041

1042
_OVER:
195,243✔
1043
  mndTransDrop(pTrans);
195,243✔
1044
  sdbFreeRaw(pRaw);
195,243✔
1045
  return code;
195,243✔
1046
}
1047

1048
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
77,166✔
1049
  SMnode       *pMnode = pReq->info.node;
77,166✔
1050
  SSdb         *pSdb = pMnode->pSdb;
77,166✔
1051
  SDnodeObj    *pObj = NULL;
77,166✔
1052
  void         *pIter = NULL;
77,166✔
1053
  SDnodeListRsp rsp = {0};
77,166✔
1054
  int32_t       code = -1;
77,166✔
1055

1056
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
77,166✔
1057
  if (NULL == rsp.dnodeList) {
77,166✔
1058
    mError("failed to alloc epSet while process dnode list req");
×
1059
    code = terrno;
×
1060
    goto _OVER;
×
1061
  }
1062

1063
  while (1) {
173,409✔
1064
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
250,575✔
1065
    if (pIter == NULL) break;
250,575✔
1066

1067
    SDNodeAddr dnodeAddr = {0};
173,409✔
1068
    dnodeAddr.nodeId = pObj->id;
173,409✔
1069
    dnodeAddr.epSet.numOfEps = 1;
173,409✔
1070
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
173,409✔
1071
    dnodeAddr.epSet.eps[0].port = pObj->port;
173,409✔
1072

1073
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
346,818✔
1074
      if (terrno != 0) code = terrno;
×
1075
      sdbRelease(pSdb, pObj);
×
1076
      sdbCancelFetch(pSdb, pIter);
×
1077
      goto _OVER;
×
1078
    }
1079

1080
    sdbRelease(pSdb, pObj);
173,409✔
1081
  }
1082

1083
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
77,166✔
1084
  void   *pRsp = rpcMallocCont(rspLen);
77,166✔
1085
  if (pRsp == NULL) {
77,166✔
1086
    code = terrno;
×
1087
    goto _OVER;
×
1088
  }
1089

1090
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
77,166✔
1091
    code = rspLen;
×
1092
    goto _OVER;
×
1093
  }
1094

1095
  pReq->info.rspLen = rspLen;
77,166✔
1096
  pReq->info.rsp = pRsp;
77,166✔
1097
  code = 0;
77,166✔
1098

1099
_OVER:
77,166✔
1100

1101
  if (code != 0) {
77,166✔
1102
    mError("failed to get dnode list since %s", tstrerror(code));
×
1103
  }
1104

1105
  tFreeSDnodeListRsp(&rsp);
77,166✔
1106

1107
  TAOS_RETURN(code);
77,166✔
1108
}
1109

1110
void getSlowLogScopeString(int32_t scope, char *result) {
2,499✔
1111
  if (scope == SLOW_LOG_TYPE_NULL) {
2,499✔
1112
    (void)strncat(result, "NONE", 64);
×
1113
    return;
×
1114
  }
1115
  while (scope > 0) {
4,998✔
1116
    if (scope & SLOW_LOG_TYPE_QUERY) {
2,499✔
1117
      (void)strncat(result, "QUERY", 64);
2,499✔
1118
      scope &= ~SLOW_LOG_TYPE_QUERY;
2,499✔
1119
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1120
      (void)strncat(result, "INSERT", 64);
×
1121
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1122
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1123
      (void)strncat(result, "OTHERS", 64);
×
1124
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1125
    } else {
1126
      (void)printf("invalid slow log scope:%d", scope);
×
1127
      return;
×
1128
    }
1129

1130
    if (scope > 0) {
2,499✔
1131
      (void)strncat(result, "|", 64);
×
1132
    }
1133
  }
1134
}
1135

1136
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
195,600✔
1137
  SMnode         *pMnode = pReq->info.node;
195,600✔
1138
  int32_t         code = -1;
195,600✔
1139
  SDnodeObj      *pDnode = NULL;
195,600✔
1140
  SCreateDnodeReq createReq = {0};
195,600✔
1141
  int32_t         lino = 0;
195,600✔
1142

1143
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
195,600✔
1144
    goto _OVER;
×
1145
  }
1146

1147
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
195,600✔
1148
  TAOS_CHECK_GOTO(code, &lino, _OVER);
195,600✔
1149

1150
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
195,600✔
1151
  code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE);
195,600✔
1152
  TAOS_CHECK_GOTO(code, &lino, _OVER);
195,600✔
1153

1154
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
195,243✔
1155
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1156
    goto _OVER;
×
1157
  }
1158
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1159
  // if (code != 0) {
1160
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1161
  //          tstrerror(code));
1162
  //   goto _OVER;
1163
  // }
1164

1165
  char ep[TSDB_EP_LEN];
195,243✔
1166
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
195,243✔
1167
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
195,243✔
1168
  if (pDnode != NULL) {
195,243✔
1169
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1170
    goto _OVER;
×
1171
  }
1172

1173
  code = mndCreateDnode(pMnode, pReq, &createReq);
195,243✔
1174
  if (code == 0) {
195,243✔
1175
    code = TSDB_CODE_ACTION_IN_PROGRESS;
195,243✔
1176
    tsGrantHBInterval = 5;
195,243✔
1177
  }
1178

1179
  char obj[200] = {0};
195,243✔
1180
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
195,243✔
1181

1182
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
195,243✔
1183

1184
_OVER:
195,600✔
1185
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
195,600✔
1186
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
357✔
1187
  }
1188

1189
  mndReleaseDnode(pMnode, pDnode);
195,600✔
1190
  tFreeSCreateDnodeReq(&createReq);
195,600✔
1191
  TAOS_RETURN(code);
195,600✔
1192
}
1193

1194
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1195

1196
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
5,232✔
1197

1198
#ifndef TD_ENTERPRISE
1199
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1200
#endif
1201

1202
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
13,004✔
1203
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1204
  int32_t  code = -1;
13,004✔
1205
  SSdbRaw *pRaw = NULL;
13,004✔
1206
  STrans  *pTrans = NULL;
13,004✔
1207
  int32_t  lino = 0;
13,004✔
1208

1209
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
13,004✔
1210
  if (pTrans == NULL) {
13,004✔
1211
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1212
    if (terrno != 0) code = terrno;
×
1213
    goto _OVER;
×
1214
  }
1215
  mndTransSetGroupParallel(pTrans);
13,004✔
1216
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
13,004✔
1217
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
13,004✔
1218

1219
  pRaw = mndDnodeActionEncode(pDnode);
13,004✔
1220
  if (pRaw == NULL) {
13,004✔
1221
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1222
    if (terrno != 0) code = terrno;
×
1223
    goto _OVER;
×
1224
  }
1225
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
13,004✔
1226
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
13,004✔
1227
  pRaw = NULL;
13,004✔
1228

1229
  pRaw = mndDnodeActionEncode(pDnode);
13,004✔
1230
  if (pRaw == NULL) {
13,004✔
1231
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1232
    if (terrno != 0) code = terrno;
×
1233
    goto _OVER;
×
1234
  }
1235
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
13,004✔
1236
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
13,004✔
1237
  pRaw = NULL;
13,004✔
1238

1239
  if (pSObj != NULL) {
13,004✔
1240
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1,110✔
1241
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
1,110✔
1242
  }
1243

1244
  if (pMObj != NULL) {
13,004✔
1245
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
620✔
1246
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
620✔
1247
  }
1248

1249
  if (pQObj != NULL) {
13,004✔
1250
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
343✔
1251
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
343✔
1252
  }
1253

1254
  if (pBObj != NULL) {
13,004✔
1255
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1,322✔
1256
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
1,322✔
1257
  }
1258

1259
  if (numOfVnodes > 0) {
11,682✔
1260
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
8,465✔
1261
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
8,465✔
1262
  }
1263

1264
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
11,682✔
1265

1266
  code = 0;
11,682✔
1267

1268
_OVER:
13,004✔
1269
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
13,004✔
1270
  mndTransDrop(pTrans);
13,004✔
1271
  sdbFreeRaw(pRaw);
13,004✔
1272
  TAOS_RETURN(code);
13,004✔
1273
}
1274

1275
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1276
  bool       isEmpty = false;
×
1277
  SMnodeObj *pMObj = NULL;
×
1278
  SQnodeObj *pQObj = NULL;
×
1279
  SSnodeObj *pSObj = NULL;
×
1280

1281
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1282
  if (pQObj) goto _OVER;
×
1283

1284
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1285
  if (pSObj) goto _OVER;
×
1286

1287
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1288
  if (pMObj) goto _OVER;
×
1289

1290
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1291
  if (numOfVnodes > 0) goto _OVER;
×
1292

1293
  isEmpty = true;
×
1294
_OVER:
×
1295
  mndReleaseMnode(pMnode, pMObj);
×
1296
  mndReleaseQnode(pMnode, pQObj);
×
1297
  mndReleaseSnode(pMnode, pSObj);
×
1298
  return isEmpty;
×
1299
}
1300

1301
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
14,968✔
1302
  SMnode       *pMnode = pReq->info.node;
14,968✔
1303
  int32_t       code = -1;
14,968✔
1304
  SDnodeObj    *pDnode = NULL;
14,968✔
1305
  SMnodeObj    *pMObj = NULL;
14,968✔
1306
  SQnodeObj    *pQObj = NULL;
14,968✔
1307
  SSnodeObj    *pSObj = NULL;
14,968✔
1308
  SBnodeObj    *pBObj = NULL;
14,968✔
1309
  SDropDnodeReq dropReq = {0};
14,968✔
1310

1311
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
14,968✔
1312

1313
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
14,968✔
1314
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1315
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
14,968✔
1316

1317
  bool force = dropReq.force;
14,611✔
1318
  if (dropReq.unsafe) {
14,611✔
1319
    force = true;
×
1320
  }
1321

1322
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
14,611✔
1323
  if (pDnode == NULL) {
14,611✔
1324
    int32_t err = terrno;
×
1325
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1326
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1327
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1328
    if (pDnode == NULL) {
×
1329
      code = err;
×
1330
      goto _OVER;
×
1331
    }
1332
  }
1333

1334
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
14,611✔
1335
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
14,611✔
1336
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
14,611✔
1337
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
14,611✔
1338
  if (pMObj != NULL) {
14,611✔
1339
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
2,227✔
1340
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
987✔
1341
      goto _OVER;
987✔
1342
    }
1343
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
1,240✔
1344
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
620✔
1345
      goto _OVER;
620✔
1346
    }
1347
  }
1348

1349
#ifdef USE_MOUNT
1350
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
13,004✔
1351
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1352
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1353
    goto _OVER;
×
1354
  }
1355
#endif
1356

1357
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
13,004✔
1358
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
13,004✔
1359

1360
  if (isonline && force) {
13,004✔
1361
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1362
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1363
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1364
    goto _OVER;
×
1365
  }
1366

1367
  mError("vnode num:%d", numOfVnodes);
13,004✔
1368

1369
  bool    vnodeOffline = false;
13,004✔
1370
  void   *pIter = NULL;
13,004✔
1371
  int32_t vgId = -1;
13,004✔
1372
  while (1) {
24,611✔
1373
    SVgObj *pVgroup = NULL;
37,615✔
1374
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
37,615✔
1375
    if (pIter == NULL) break;
37,615✔
1376

1377
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
74,836✔
1378
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
50,225✔
1379
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
50,225✔
1380
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
17,171✔
1381
          vgId = pVgroup->vgId;
×
1382
          vnodeOffline = true;
×
1383
          break;
×
1384
        }
1385
      }
1386
    }
1387

1388
    sdbRelease(pMnode->pSdb, pVgroup);
24,611✔
1389

1390
    if (vnodeOffline) {
24,611✔
1391
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1392
      break;
×
1393
    }
1394
  }
1395

1396
  if (vnodeOffline && !force) {
13,004✔
1397
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1398
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1399
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1400
    goto _OVER;
×
1401
  }
1402

1403
  if (!isonline && !force) {
13,004✔
1404
    code = TSDB_CODE_DNODE_OFFLINE;
×
1405
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
×
1406
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1407
    goto _OVER;
×
1408
  }
1409

1410
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
13,004✔
1411
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
13,004✔
1412

1413
  char obj1[30] = {0};
13,004✔
1414
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
13,004✔
1415

1416
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
13,004✔
1417

1418
_OVER:
14,968✔
1419
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
14,968✔
1420
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
3,286✔
1421
  }
1422

1423
  mndReleaseDnode(pMnode, pDnode);
14,968✔
1424
  mndReleaseMnode(pMnode, pMObj);
14,968✔
1425
  mndReleaseQnode(pMnode, pQObj);
14,968✔
1426
  mndReleaseBnode(pMnode, pBObj);
14,968✔
1427
  mndReleaseSnode(pMnode, pSObj);
14,968✔
1428
  tFreeSDropDnodeReq(&dropReq);
14,968✔
1429
  TAOS_RETURN(code);
14,968✔
1430
}
1431

1432
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1,269✔
1433
  int32_t code = 0;
1,269✔
1434
  SMnode *pMnode = pReq->info.node;
1,269✔
1435
  SSdb   *pSdb = pMnode->pSdb;
1,269✔
1436
  void   *pIter = NULL;
1,269✔
1437
  int8_t  encrypting = 0;
1,269✔
1438

1439
  const STraceId *trace = &pReq->info.traceId;
1,269✔
1440

1441
  int32_t klen = strlen(pDcfgReq->value);
1,269✔
1442
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1,269✔
1443
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1444
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1445
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1446
    goto _exit;
×
1447
  }
1448

1449
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1,269✔
1450
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1451
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1452
    goto _exit;
×
1453
  }
1454

1455
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1,269✔
1456
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1457
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1458
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1459
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1460
    goto _exit;
×
1461
  }
1462

1463
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1,269✔
1464
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1,269✔
1465
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1,269✔
1466

1467
  while (1) {
2,861✔
1468
    SDnodeObj *pDnode = NULL;
4,130✔
1469
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
4,130✔
1470
    if (pIter == NULL) break;
4,130✔
1471
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
2,861✔
1472
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1473
             offlineReason[pDnode->offlineReason]);
1474
      sdbRelease(pSdb, pDnode);
×
1475
      continue;
×
1476
    }
1477

1478
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
2,861✔
1479
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
2,861✔
1480
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
2,861✔
1481
      void   *pBuf = rpcMallocCont(bufLen);
2,861✔
1482

1483
      if (pBuf != NULL) {
2,861✔
1484
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
2,861✔
1485
          code = bufLen;
×
1486
          sdbRelease(pSdb, pDnode);
×
1487
          goto _exit;
×
1488
        }
1489
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
2,861✔
1490
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
2,861✔
1491
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
2,861✔
1492
        }
1493
      }
1494
    }
1495

1496
    sdbRelease(pSdb, pDnode);
2,861✔
1497
  }
1498

1499
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1,269✔
1500
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1501
  }
1502

1503
_exit:
1,269✔
1504
  if (code != 0) {
1,269✔
1505
    if (terrno == 0) terrno = code;
×
1506
  }
1507
  return code;
1,269✔
1508
}
1509

1510
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1,269✔
1511
  int32_t code = 0;
1,269✔
1512

1513
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1514
  SMnode       *pMnode = pReq->info.node;
1,269✔
1515
  SMCfgDnodeReq cfgReq = {0};
1,269✔
1516
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1,269✔
1517

1518
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1,269✔
1519
    tFreeSMCfgDnodeReq(&cfgReq);
×
1520
    TAOS_RETURN(code);
×
1521
  }
1522
  const STraceId *trace = &pReq->info.traceId;
1,269✔
1523
  SDCfgDnodeReq   dcfgReq = {0};
1,269✔
1524
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1,269✔
1525
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
1,269✔
1526
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
1,269✔
1527
    tFreeSMCfgDnodeReq(&cfgReq);
1,269✔
1528
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1,269✔
1529
  } else {
1530
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1531
    tFreeSMCfgDnodeReq(&cfgReq);
×
1532
    TAOS_RETURN(code);
×
1533
  }
1534

1535
#else
1536
  TAOS_RETURN(code);
1537
#endif
1538
}
1539

1540
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
2,861✔
1541
  SMnode *pMnode = pRsp->info.node;
2,861✔
1542
  int16_t nSuccess = 0;
2,861✔
1543
  int16_t nFailed = 0;
2,861✔
1544

1545
  if (0 == pRsp->code) {
2,861✔
1546
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
2,861✔
1547
  } else {
1548
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1549
  }
1550

1551
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
2,861✔
1552
  bool    finished = nSuccess + nFailed >= nReq;
2,861✔
1553

1554
  if (finished) {
2,861✔
1555
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1,269✔
1556
  }
1557

1558
  const STraceId *trace = &pRsp->info.traceId;
2,861✔
1559
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
2,861✔
1560
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1561

1562
  return 0;
2,861✔
1563
}
1564

1565
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
2,499✔
1566
  SMnode *pMnode = pReq->info.node;
2,499✔
1567
  int32_t totalRows = 0;
2,499✔
1568
  int32_t numOfRows = 0;
2,499✔
1569
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
2,499✔
1570
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
2,499✔
1571
  char   *pWrite = NULL;
2,499✔
1572
  int32_t cols = 0;
2,499✔
1573
  int32_t code = 0;
2,499✔
1574
  int32_t lino = 0;
2,499✔
1575

1576
  cfgOpts[totalRows] = "statusIntervalMs";
2,499✔
1577
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
2,499✔
1578
  totalRows++;
2,499✔
1579

1580
  cfgOpts[totalRows] = "timezone";
2,499✔
1581
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
2,499✔
1582
  totalRows++;
2,499✔
1583

1584
  cfgOpts[totalRows] = "locale";
2,499✔
1585
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
2,499✔
1586
  totalRows++;
2,499✔
1587

1588
  cfgOpts[totalRows] = "charset";
2,499✔
1589
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
2,499✔
1590
  totalRows++;
2,499✔
1591

1592
  cfgOpts[totalRows] = "monitor";
2,499✔
1593
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
2,499✔
1594
  totalRows++;
2,499✔
1595

1596
  cfgOpts[totalRows] = "monitorInterval";
2,499✔
1597
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
2,499✔
1598
  totalRows++;
2,499✔
1599

1600
  cfgOpts[totalRows] = "slowLogThreshold";
2,499✔
1601
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
2,499✔
1602
  totalRows++;
2,499✔
1603

1604
  cfgOpts[totalRows] = "slowLogMaxLen";
2,499✔
1605
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
2,499✔
1606
  totalRows++;
2,499✔
1607

1608
  char scopeStr[64] = {0};
2,499✔
1609
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
2,499✔
1610
  cfgOpts[totalRows] = "slowLogScope";
2,499✔
1611
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
2,499✔
1612
  totalRows++;
2,499✔
1613

1614
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
2,499✔
1615
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
2,499✔
1616

1617
  for (int32_t i = 0; i < totalRows; i++) {
24,990✔
1618
    cols = 0;
22,491✔
1619

1620
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
22,491✔
1621
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,491✔
1622
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
22,491✔
1623

1624
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
22,491✔
1625
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,491✔
1626
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
22,491✔
1627

1628
    numOfRows++;
22,491✔
1629
  }
1630

1631
_OVER:
2,499✔
1632
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
2,499✔
1633
  pShow->numOfRows += numOfRows;
2,499✔
1634
  return numOfRows;
2,499✔
1635
}
1636

1637
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1638

1639
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
884,423✔
1640
  SMnode    *pMnode = pReq->info.node;
884,423✔
1641
  SSdb      *pSdb = pMnode->pSdb;
884,423✔
1642
  int32_t    numOfRows = 0;
884,423✔
1643
  int32_t    cols = 0;
884,423✔
1644
  ESdbStatus objStatus = 0;
884,423✔
1645
  SDnodeObj *pDnode = NULL;
884,423✔
1646
  int64_t    curMs = taosGetTimestampMs();
884,423✔
1647
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
884,216✔
1648
  int32_t    code = 0;
884,423✔
1649
  int32_t    lino = 0;
884,423✔
1650

1651
  while (numOfRows < rows) {
3,318,577✔
1652
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
3,318,577✔
1653
    if (pShow->pIter == NULL) break;
3,318,577✔
1654
    bool online = mndIsDnodeOnline(pDnode, curMs);
2,434,154✔
1655

1656
    cols = 0;
2,434,154✔
1657

1658
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,434,154✔
1659
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
2,434,154✔
1660

1661
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
2,434,154✔
1662

1663
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,434,154✔
1664
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,434,154✔
1665

1666
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,434,154✔
1667
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
2,434,154✔
1668
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
2,434,154✔
1669

1670
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,434,154✔
1671
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
2,434,154✔
1672
                        &lino, _OVER);
1673

1674
    const char *status = "ready";
2,434,154✔
1675
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
2,434,154✔
1676
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
2,434,154✔
1677
    if (!online) {
2,434,154✔
1678
      if (objStatus == SDB_STATUS_CREATING)
252,390✔
1679
        status = "creating*";
×
1680
      else if (objStatus == SDB_STATUS_DROPPING)
252,390✔
1681
        status = "dropping*";
×
1682
      else
1683
        status = "offline";
252,390✔
1684
    }
1685

1686
    STR_TO_VARSTR(buf, status);
2,434,154✔
1687
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,434,154✔
1688
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,434,154✔
1689

1690
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,434,154✔
1691
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
2,434,154✔
1692
                        _OVER);
1693

1694
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,434,154✔
1695
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
2,434,154✔
1696
                        _OVER);
1697

1698
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
2,434,154✔
1699
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
2,434,154✔
1700

1701
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,434,154✔
1702
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
2,434,154✔
1703
    taosMemoryFreeClear(b);
2,434,154✔
1704

1705
#ifdef TD_ENTERPRISE
1706
    STR_TO_VARSTR(buf, pDnode->machineId);
2,434,154✔
1707
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,434,154✔
1708
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,434,154✔
1709
#endif
1710

1711
    numOfRows++;
2,434,154✔
1712
    sdbRelease(pSdb, pDnode);
2,434,154✔
1713
  }
1714

1715
_OVER:
884,216✔
1716
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
884,423✔
1717

1718
  pShow->numOfRows += numOfRows;
884,423✔
1719
  return numOfRows;
884,423✔
1720
}
1721

1722
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1723
  SSdb *pSdb = pMnode->pSdb;
×
1724
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1725
}
×
1726

1727
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1728
  int32_t    code = 0;
×
1729
  SDnodeObj *pObj = NULL;
×
1730
  void      *pIter = NULL;
×
1731
  SSdb      *pSdb = pMnode->pSdb;
×
1732
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1733
  if (fqdns == NULL) {
×
1734
    mError("failed to init fqdns array");
×
1735
    return NULL;
×
1736
  }
1737

1738
  while (1) {
×
1739
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1740
    if (pIter == NULL) break;
×
1741

1742
    char *fqdn = taosStrdup(pObj->fqdn);
×
1743
    if (fqdn == NULL) {
×
1744
      sdbRelease(pSdb, pObj);
×
1745
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
1746

1747
      code = terrno;
×
1748
      break;
×
1749
    }
1750

1751
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1752
      mError("failed to fqdn into array, but continue at this time");
×
1753
    }
1754
    sdbRelease(pSdb, pObj);
×
1755
  }
1756

1757
_error:
×
1758
  if (code != 0) {
×
1759
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
1760
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
1761
      taosMemoryFreeClear(pFqdn);
×
1762
    }
1763
    taosArrayDestroy(fqdns);
×
1764
    fqdns = NULL;
×
1765
  }
1766

1767
  return fqdns;
×
1768
}
1769

1770
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
1771
  if (p == NULL) return NULL;
×
1772

1773
  switch (type) {
×
1774
    case SDB_DNODE:
×
1775
      return (SDnodeObj *)p;
×
1776
    case SDB_QNODE:
×
1777
      return ((SQnodeObj *)p)->pDnode;
×
1778
    case SDB_SNODE:
×
1779
      return ((SSnodeObj *)p)->pDnode;
×
1780
    case SDB_BNODE:
×
1781
      return ((SBnodeObj *)p)->pDnode;
×
1782
    default:
×
1783
      break;
×
1784
  }
1785
  return NULL;
×
1786
}
1787
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
1788
  int32_t lino = 0;
×
1789
  SSdb   *pSdb = pMnode->pSdb;
×
1790
  void   *pIter = NULL;
×
1791
  int32_t code = 0;
×
1792

1793
  while (1) {
×
1794
    void *pObj = NULL;
×
1795
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
1796
    if (pIter == NULL) break;
×
1797

1798
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
1799
    if (pDnodeObj == NULL) {
×
1800
      mError("null dnode object for type:%d", type);
×
1801
      sdbRelease(pSdb, pObj);
×
1802
      continue;
×
1803
    }
1804

1805
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
1806
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
1807
      mError("failed to push addr into array");
×
1808
      sdbRelease(pSdb, pObj);
×
1809
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1810
    }
1811
    sdbRelease(pSdb, pObj);
×
1812
  }
1813

1814
_exit:
×
1815
  return code;
×
1816
}
1817

1818
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
1819
  int32_t lino = 0;
×
1820
  int32_t code = 0;
×
1821
  if (pMnode == NULL || pAddr == NULL) {
×
1822
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
1823
  }
1824

1825
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
1826
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1827

1828
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
1829
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1830

1831
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
1832
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1833

1834
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
1835
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1836

1837
_error:
×
1838
  return code;
×
1839
}
1840

1841
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
1842
  int32_t code = 0;
×
1843

1844
  SMnode *pMnode = pReq->info.node;
×
1845
  void   *pIter = NULL;
×
1846
  SSdb   *pSdb = pMnode->pSdb;
×
1847
  mInfo("start to reload dnode tls config");
×
1848

1849
  SMCfgDnodeReq req = {0};
×
1850
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
1851
    goto _OVER;
×
1852
  }
1853

1854
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
1855
    goto _OVER;
×
1856
  }
1857

1858
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
1859
  if (pAddr == NULL) {
×
1860
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
1861
  }
1862

1863
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
1864

1865
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
1866
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
1867
    // SEpSet epSet = mndCreateEpSetByStr(addr);
1868
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
1869
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
1870
    if (code != 0) {
×
1871
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
1872
    }
1873
  }
1874

1875
_OVER:
×
1876
  tFreeSMCfgDnodeReq(&req);
×
1877
  taosArrayDestroy(pAddr);
×
1878
  return code;
×
1879
}
1880

1881
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
1882
  int32_t code = 0;
×
1883
  if (pRsp->code != 0) {
×
1884
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
1885
  } else {
1886
    mInfo("succeed to reload dnode tls config");
×
1887
  }
1888
  return code;
×
1889
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc