• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4506

15 Jul 2025 12:33AM UTC coverage: 62.026% (-0.7%) from 62.706%
#4506

push

travis-ci

web-flow
docs: update stream docs (#31874)

155391 of 320094 branches covered (48.55%)

Branch coverage included in aggregate %.

240721 of 318525 relevant lines covered (75.57%)

6529048.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.77
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "taos_monitor.h"
33
#include "tconfig.h"
34
#include "tjson.h"
35
#include "tmisce.h"
36
#include "tunit.h"
37

38
#define TSDB_DNODE_VER_NUMBER   2
39
#define TSDB_DNODE_RESERVE_SIZE 40
40

41
static const char *offlineReason[] = {
42
    "",
43
    "status msg timeout",
44
    "status not received",
45
    "version not match",
46
    "dnodeId not match",
47
    "clusterId not match",
48
    "statusInterval not match",
49
    "timezone not match",
50
    "locale not match",
51
    "charset not match",
52
    "ttlChangeOnWrite not match",
53
    "enableWhiteList not match",
54
    "encryptionKey not match",
55
    "monitor not match",
56
    "monitor switch not match",
57
    "monitor interval not match",
58
    "monitor slow log threshold not match",
59
    "monitor slow log sql max len not match",
60
    "monitor slow log scope not match",
61
    "unknown",
62
};
63

64
enum {
65
  DND_ACTIVE_CODE,
66
  DND_CONN_ACTIVE_CODE,
67
};
68

69
enum {
70
  DND_CREATE,
71
  DND_ADD,
72
  DND_DROP,
73
};
74

75
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
76
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
77
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
78
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
79
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
80
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
81
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
82

83
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
91
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
92
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
93

94
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
96
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
98

99
#ifdef _GRANT
100
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
101
#else
102
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
103
#endif
104

105
int32_t mndInitDnode(SMnode *pMnode) {
2,477✔
106
  SSdbTable table = {
2,477✔
107
      .sdbType = SDB_DNODE,
108
      .keyType = SDB_KEY_INT32,
109
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
110
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
111
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
112
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
113
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
114
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
115
  };
116

117
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
2,477✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
2,477✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
2,477✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
2,477✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
2,477✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
2,477✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
2,477✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
2,477✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
2,477✔
126
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
2,477✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
2,477✔
128

129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
2,477✔
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
2,477✔
131
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
2,477✔
132
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
2,477✔
133

134
  return sdbSetTable(pMnode->pSdb, table);
2,477✔
135
}
136

137
void mndCleanupDnode(SMnode *pMnode) {}
2,476✔
138

139
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
1,870✔
140
  int32_t  code = -1;
1,870✔
141
  SSdbRaw *pRaw = NULL;
1,870✔
142
  STrans  *pTrans = NULL;
1,870✔
143

144
  SDnodeObj dnodeObj = {0};
1,870✔
145
  dnodeObj.id = 1;
1,870✔
146
  dnodeObj.createdTime = taosGetTimestampMs();
1,870✔
147
  dnodeObj.updateTime = dnodeObj.createdTime;
1,870✔
148
  dnodeObj.port = tsServerPort;
1,870✔
149
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
1,870✔
150
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
1,870✔
151
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
1,870✔
152
  char *machineId = NULL;
1,870✔
153
  code = tGetMachineId(&machineId);
1,870✔
154
  if (machineId) {
1,870!
155
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
1,870✔
156
    taosMemoryFreeClear(machineId);
1,870!
157
  } else {
158
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
159
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
160
    goto _OVER;
×
161
#endif
162
  }
163

164
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
1,870✔
165
  if (pTrans == NULL) {
1,870!
166
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
167
    if (terrno != 0) code = terrno;
×
168
    goto _OVER;
×
169
  }
170
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
1,870!
171

172
  pRaw = mndDnodeActionEncode(&dnodeObj);
1,870✔
173
  if (pRaw == NULL) {
1,870!
174
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
175
    if (terrno != 0) code = terrno;
×
176
    goto _OVER;
×
177
  }
178
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
1,870!
179
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
1,870!
180
  pRaw = NULL;
1,870✔
181

182
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,870!
183
  code = 0;
1,870✔
184
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
1,870✔
185
                                   1);  // TODO: check the return value
186

187
_OVER:
1,870✔
188
  mndTransDrop(pTrans);
1,870✔
189
  sdbFreeRaw(pRaw);
1,870✔
190
  return code;
1,870✔
191
}
192

193
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
14,541✔
194
  int32_t code = 0;
14,541✔
195
  int32_t lino = 0;
14,541✔
196
  terrno = TSDB_CODE_OUT_OF_MEMORY;
14,541✔
197

198
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
14,541✔
199
  if (pRaw == NULL) goto _OVER;
14,541!
200

201
  int32_t dataPos = 0;
14,541✔
202
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
14,541!
203
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
14,541!
204
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
14,541!
205
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
14,541!
206
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
14,541!
207
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
14,541!
208
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
14,541!
209
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
14,541!
210
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
14,541!
211
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
14,541!
212

213
  terrno = 0;
14,541✔
214

215
_OVER:
14,541✔
216
  if (terrno != 0) {
14,541!
217
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
218
    sdbFreeRaw(pRaw);
×
219
    return NULL;
×
220
  }
221

222
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
14,541✔
223
  return pRaw;
14,541✔
224
}
225

226
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
11,187✔
227
  int32_t code = 0;
11,187✔
228
  int32_t lino = 0;
11,187✔
229
  terrno = TSDB_CODE_OUT_OF_MEMORY;
11,187✔
230
  SSdbRow   *pRow = NULL;
11,187✔
231
  SDnodeObj *pDnode = NULL;
11,187✔
232

233
  int8_t sver = 0;
11,187✔
234
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
11,187!
235
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
11,187!
236
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
237
    goto _OVER;
×
238
  }
239

240
  pRow = sdbAllocRow(sizeof(SDnodeObj));
11,187✔
241
  if (pRow == NULL) goto _OVER;
11,187!
242

243
  pDnode = sdbGetRowObj(pRow);
11,187✔
244
  if (pDnode == NULL) goto _OVER;
11,187!
245

246
  int32_t dataPos = 0;
11,187✔
247
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
11,187!
248
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
11,187!
249
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
11,187!
250
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
11,187!
251
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
11,187!
252
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
11,187!
253
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
11,187!
254
  if (sver > 1) {
11,187!
255
    int16_t keyLen = 0;
11,187✔
256
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
11,187!
257
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
11,187!
258
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
11,187!
259
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
11,187!
260
  }
261

262
  terrno = 0;
11,187✔
263
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
11,187!
264
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
265
  }
266

267
_OVER:
11,187✔
268
  if (terrno != 0) {
11,187!
269
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
270
    taosMemoryFreeClear(pRow);
×
271
    return NULL;
×
272
  }
273

274
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
11,187✔
275
  return pRow;
11,187✔
276
}
277

278
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
4,463✔
279
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
4,463✔
280
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
4,463✔
281

282
  char ep[TSDB_EP_LEN] = {0};
4,463✔
283
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
4,463✔
284
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
4,463✔
285
  return 0;
4,463✔
286
}
287

288
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
11,186✔
289
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
11,186✔
290
  return 0;
11,186✔
291
}
292

293
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
6,676✔
294
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
6,676✔
295
  pOld->updateTime = pNew->updateTime;
6,676✔
296
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
297
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
6,676✔
298
#endif
299
  return 0;
6,676✔
300
}
301

302
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
649,193✔
303
  SSdb      *pSdb = pMnode->pSdb;
649,193✔
304
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
649,193✔
305
  if (pDnode == NULL) {
649,194✔
306
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
875✔
307
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
202✔
308
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
673!
309
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
310
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
673!
311
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
673✔
312
    } else {
313
      terrno = TSDB_CODE_APP_ERROR;
×
314
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
315
    }
316
  }
317

318
  return pDnode;
649,194✔
319
}
320

321
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
654,130✔
322
  SSdb *pSdb = pMnode->pSdb;
654,130✔
323
  sdbRelease(pSdb, pDnode);
654,130✔
324
}
654,133✔
325

326
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
30,965✔
327
  SEpSet epSet = {0};
30,965✔
328
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
30,965✔
329
  return epSet;
30,965✔
330
}
331

332
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
333
  SEpSet     epSet = {0};
×
334
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
335
  if (!pDnode) return epSet;
×
336

337
  epSet = mndGetDnodeEpset(pDnode);
×
338

339
  mndReleaseDnode(pMnode, pDnode);
×
340
  return epSet;
×
341
}
342

343
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
5,798✔
344
  SSdb *pSdb = pMnode->pSdb;
5,798✔
345

346
  void *pIter = NULL;
5,798✔
347
  while (1) {
7,636✔
348
    SDnodeObj *pDnode = NULL;
13,434✔
349
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
13,434✔
350
    if (pIter == NULL) break;
13,434✔
351

352
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
10,708✔
353
      sdbCancelFetch(pSdb, pIter);
3,072✔
354
      return pDnode;
3,072✔
355
    }
356

357
    sdbRelease(pSdb, pDnode);
7,636✔
358
  }
359

360
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
2,726✔
361
  return NULL;
2,726✔
362
}
363

364
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
504✔
365
  SSdb *pSdb = pMnode->pSdb;
504✔
366

367
  void *pIter = NULL;
504✔
368
  while (1) {
504✔
369
    SDnodeObj *pDnode = NULL;
1,008✔
370
    ESdbStatus objStatus = 0;
1,008✔
371
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
1,008✔
372
    if (pIter == NULL) break;
1,008!
373

374
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
1,008✔
375
      sdbCancelFetch(pSdb, pIter);
504✔
376
      return pDnode;
504✔
377
    }
378

379
    sdbRelease(pSdb, pDnode);
504✔
380
  }
381

382
  return NULL;
×
383
}
384

385
int32_t mndGetDnodeSize(SMnode *pMnode) {
219,506✔
386
  SSdb *pSdb = pMnode->pSdb;
219,506✔
387
  return sdbGetSize(pSdb, SDB_DNODE);
219,506✔
388
}
389

390
int32_t mndGetDbSize(SMnode *pMnode) {
×
391
  SSdb *pSdb = pMnode->pSdb;
×
392
  return sdbGetSize(pSdb, SDB_DB);
×
393
}
394

395
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
243,223✔
396
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
243,223✔
397
  if (interval > 5000 * (int64_t)tsStatusInterval) {
243,223✔
398
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
11,742✔
399
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
245✔
400
    }
401
    return false;
11,742✔
402
  }
403
  return true;
231,481✔
404
}
405

406
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
12,245✔
407
  SSdb *pSdb = pMnode->pSdb;
12,245✔
408

409
  int32_t numOfEps = 0;
12,245✔
410
  void   *pIter = NULL;
12,245✔
411
  while (1) {
36,687✔
412
    SDnodeObj *pDnode = NULL;
48,932✔
413
    ESdbStatus objStatus = 0;
48,932✔
414
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
48,932✔
415
    if (pIter == NULL) break;
48,932✔
416

417
    SDnodeEp dnodeEp = {0};
36,687✔
418
    dnodeEp.id = pDnode->id;
36,687✔
419
    dnodeEp.ep.port = pDnode->port;
36,687✔
420
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
36,687✔
421
    sdbRelease(pSdb, pDnode);
36,687✔
422

423
    dnodeEp.isMnode = 0;
36,687✔
424
    if (mndIsMnode(pMnode, pDnode->id)) {
36,687✔
425
      dnodeEp.isMnode = 1;
16,785✔
426
    }
427
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
36,687!
428
      mError("failed to put ep into array, but continue at this call");
×
429
    }
430
  }
431
}
12,245✔
432

433
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
67,847✔
434
  SSdb   *pSdb = pMnode->pSdb;
67,847✔
435
  int32_t code = 0;
67,847✔
436

437
  int32_t numOfEps = 0;
67,847✔
438
  void   *pIter = NULL;
67,847✔
439
  while (1) {
307,390✔
440
    SDnodeObj *pDnode = NULL;
375,237✔
441
    ESdbStatus objStatus = 0;
375,237✔
442
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
375,237✔
443
    if (pIter == NULL) break;
375,237✔
444

445
    SDnodeInfo dInfo;
446
    dInfo.id = pDnode->id;
307,390✔
447
    dInfo.ep.port = pDnode->port;
307,390✔
448
    dInfo.offlineReason = pDnode->offlineReason;
307,390✔
449
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
307,390✔
450
    sdbRelease(pSdb, pDnode);
307,390✔
451
    if (mndIsMnode(pMnode, pDnode->id)) {
307,390✔
452
      dInfo.isMnode = 1;
91,872✔
453
    } else {
454
      dInfo.isMnode = 0;
215,518✔
455
    }
456

457
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
307,390!
458
      code = terrno;
×
459
      sdbCancelFetch(pSdb, pIter);
×
460
      break;
×
461
    }
462
  }
463
  TAOS_RETURN(code);
67,847✔
464
}
465

466
#define CHECK_MONITOR_PARA(para, err)                                                                    \
467
  if (pCfg->monitorParas.para != para) {                                                                 \
468
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
469
    terrno = err;                                                                                        \
470
    return err;                                                                                          \
471
  }
472

473
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
12,245✔
474
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
12,245!
475
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
12,245!
476
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
12,245!
477
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
12,245!
478
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
12,245!
479

480
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
12,245!
481
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
482
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
483
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
484
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
485
  }
486

487
  if (pCfg->statusInterval != tsStatusInterval) {
12,245!
488
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
489
           tsStatusInterval);
490
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
491
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
492
  }
493

494
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
12,245!
495
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
496
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
497
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
498
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
499
  }
500

501
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
12,245!
502
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
503
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
504
    return DND_REASON_LOCALE_NOT_MATCH;
×
505
  }
506

507
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
12,245!
508
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
509
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
510
    return DND_REASON_CHARSET_NOT_MATCH;
×
511
  }
512

513
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
12,245!
514
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
515
           tsTtlChangeOnWrite);
516
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
517
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
518
  }
519
  int8_t enable = tsEnableWhiteList ? 1 : 0;
12,245✔
520
  if (pCfg->enableWhiteList != enable) {
12,245!
521
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
522
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
523
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
524
  }
525

526
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
12,245!
527
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
12,245!
528
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
529
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
530
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
531
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
532
  }
533

534
  return DND_REASON_ONLINE;
12,245✔
535
}
536

537
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
120✔
538
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
120!
539
    return 0.0;
22✔
540
  }
541

542
  int64_t deltaCount = currentCount - lastCount;
98✔
543
  int64_t deltaMs = currentTimeMs - lastTimeMs;
98✔
544
  double  rate = (double)deltaCount / (double)deltaMs;
98✔
545
  return rate;
98✔
546
}
547

548
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
243,816✔
549
  bool stateChanged = false;
243,816✔
550
  bool roleChanged = pGid->syncState != pVload->syncState ||
726,679✔
551
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
470,790!
552
                     pGid->roleTimeMs != pVload->roleTimeMs;
226,974✔
553

554
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
243,816✔
555
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
238✔
556
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
118✔
557
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
120!
558
      int64_t currentTimeMs = taosGetTimestampMs();
120✔
559
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
120✔
560
                                          pGid->lastSyncAppliedIndexUpdateTime);
561

562
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
120✔
563
    }
564
  }
565

566
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
243,816✔
567
  pGid->syncCommitIndex = pVload->syncCommitIndex;
243,816✔
568
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
243,816✔
569
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
243,816✔
570
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
243,816!
571
      pGid->startTimeMs != pVload->startTimeMs) {
225,521!
572
    mInfo(
18,295!
573
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
574
        "canRead:%d, dnode:%d",
575
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
576
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
577
    pGid->syncState = pVload->syncState;
18,295✔
578
    pGid->syncTerm = pVload->syncTerm;
18,295✔
579
    pGid->syncRestore = pVload->syncRestore;
18,295✔
580
    pGid->syncCanRead = pVload->syncCanRead;
18,295✔
581
    pGid->startTimeMs = pVload->startTimeMs;
18,295✔
582
    pGid->roleTimeMs = pVload->roleTimeMs;
18,295✔
583
    stateChanged = true;
18,295✔
584
  }
585
  return stateChanged;
243,816✔
586
}
587

588
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
64,844✔
589
  bool stateChanged = false;
64,844✔
590
  bool roleChanged = pObj->syncState != pMload->syncState ||
192,067✔
591
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
127,167!
592
                     pObj->roleTimeMs != pMload->roleTimeMs;
62,323✔
593
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
64,844✔
594
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
2,605!
595
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
596
          pObj->syncTerm, pMload->syncTerm);
597
    pObj->syncState = pMload->syncState;
2,605✔
598
    pObj->syncTerm = pMload->syncTerm;
2,605✔
599
    pObj->syncRestore = pMload->syncRestore;
2,605✔
600
    pObj->roleTimeMs = pMload->roleTimeMs;
2,605✔
601
    stateChanged = true;
2,605✔
602
  }
603
  return stateChanged;
64,844✔
604
}
605

606
extern char   *tsMonFwUri;
607
extern char   *tsMonSlowLogUri;
608
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
276✔
609
  SMnode    *pMnode = pReq->info.node;
276✔
610
  SStatisReq statisReq = {0};
276✔
611
  int32_t    code = -1;
276✔
612

613
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
276!
614

615
  if (tsMonitorLogProtocol) {
276!
616
    mInfo("process statis req,\n %s", statisReq.pCont);
×
617
  }
618

619
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
276✔
620
    monSendContent(statisReq.pCont, tsMonFwUri);
254✔
621
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
22!
622
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
22✔
623
  }
624

625
  tFreeSStatisReq(&statisReq);
276✔
626
  return 0;
276✔
627
}
628

629
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
7,088✔
630
  mTrace("process audit req:%p", pReq);
7,088✔
631
  if (tsEnableAudit && tsEnableAuditDelete) {
7,088!
632
    SMnode   *pMnode = pReq->info.node;
7,088✔
633
    SAuditReq auditReq = {0};
7,088✔
634

635
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
7,088!
636

637
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
7,088✔
638

639
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
7,088✔
640
                   auditReq.sqlLen);
641

642
    tFreeSAuditReq(&auditReq);
7,088✔
643
  }
644
  return 0;
7,088✔
645
}
646

647
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
5,269✔
648
  int32_t       code = 0, lino = 0;
5,269✔
649
  SDnodeInfoReq infoReq = {0};
5,269✔
650
  int32_t       contLen = 0;
5,269✔
651
  void         *pReq = NULL;
5,269✔
652

653
  infoReq.dnodeId = pDnode->id;
5,269✔
654
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
5,269✔
655

656
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
5,269!
657
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
658
  }
659
  pReq = rpcMallocCont(contLen);
5,269✔
660
  if (pReq == NULL) {
5,269!
661
    TAOS_RETURN(terrno);
×
662
  }
663

664
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
5,269!
665
    code = contLen;
×
666
    goto _exit;
×
667
  }
668

669
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
5,269✔
670
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
5,269!
671
_exit:
5,269✔
672
  if (code < 0) {
5,269!
673
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
674
  }
675
  TAOS_RETURN(code);
5,269✔
676
}
677

678
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
5,265✔
679
  int32_t       code = 0, lino = 0;
5,265✔
680
  SMnode       *pMnode = pReq->info.node;
5,265✔
681
  SDnodeInfoReq infoReq = {0};
5,265✔
682
  SDnodeObj    *pDnode = NULL;
5,265✔
683
  STrans       *pTrans = NULL;
5,265✔
684
  SSdbRaw      *pCommitRaw = NULL;
5,265✔
685

686
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
5,265!
687

688
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
5,265✔
689
  if (pDnode == NULL) {
5,265✔
690
    TAOS_CHECK_EXIT(terrno);
1!
691
  }
692

693
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
5,264✔
694
  if (pTrans == NULL) {
5,264!
695
    TAOS_CHECK_EXIT(terrno);
×
696
  }
697

698
  pDnode->updateTime = taosGetTimestampMs();
5,264✔
699

700
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
5,264!
701
    TAOS_CHECK_EXIT(terrno);
×
702
  }
703
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
5,264!
704
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
705
    TAOS_CHECK_EXIT(code);
×
706
  }
707
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
5,264!
708
  pCommitRaw = NULL;
5,264✔
709

710
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
5,264✔
711
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
1!
712
    TAOS_CHECK_EXIT(code);
1!
713
  }
714

715
_exit:
5,263✔
716
  mndReleaseDnode(pMnode, pDnode);
5,265✔
717
  if (code != 0) {
5,265✔
718
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
2!
719
  }
720
  mndTransDrop(pTrans);
5,265✔
721
  sdbFreeRaw(pCommitRaw);
5,265✔
722
  TAOS_RETURN(code);
5,265✔
723
}
724

725
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
108,733✔
726
  SMnode    *pMnode = pReq->info.node;
108,733✔
727
  SStatusReq statusReq = {0};
108,733✔
728
  SDnodeObj *pDnode = NULL;
108,733✔
729
  int32_t    code = -1;
108,733✔
730

731
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
108,733!
732

733
  int64_t clusterid = mndGetClusterId(pMnode);
108,733✔
734
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
108,733!
735
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
736
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
737
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
738
    goto _OVER;
×
739
  }
740

741
  if (statusReq.dnodeId == 0) {
108,733✔
742
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
4,532✔
743
    if (pDnode == NULL) {
4,532✔
744
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
1,467!
745
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
1,467✔
746
      if (terrno != 0) code = terrno;
1,467!
747
      goto _OVER;
1,467✔
748
    }
749
  } else {
750
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
104,201✔
751
    if (pDnode == NULL) {
104,201✔
752
      int32_t err = terrno;
662✔
753
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
662✔
754
      if (pDnode != NULL) {
662✔
755
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
7✔
756
        terrno = err;
7✔
757
        goto _OVER;
7✔
758
      }
759

760
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
655!
761
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
655✔
762
        terrno = err;
151✔
763
        goto _OVER;
151✔
764
      } else {
765
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
504✔
766
        if (pDnode == NULL) goto _OVER;
504!
767
      }
768
    }
769
  }
770

771
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
107,108✔
772

773
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
107,108✔
774
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
107,108✔
775
  int64_t curMs = taosGetTimestampMs();
107,108✔
776
  bool    online = mndIsDnodeOnline(pDnode, curMs);
107,108✔
777
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
107,108✔
778
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
107,108✔
779
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
107,108✔
780
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
107,108✔
781
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
107,108✔
782
  bool    analVerChanged = (analVer != statusReq.analVer);
107,108✔
783
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
103,737✔
784
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
210,845!
785
  const STraceId *trace = &pReq->info.traceId;
107,108✔
786
  char            timestamp[TD_TIME_STR_LEN] = {0};
107,108✔
787
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
107,108✔
788
  mGTrace(
107,108!
789
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
790
      "timestamp:%s",
791
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
792

793
  if (reboot) {
107,108✔
794
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
3,440✔
795
  }
796

797
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
353,934✔
798
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
246,826✔
799

800
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
246,826✔
801
    if (pVgroup != NULL) {
246,826✔
802
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
243,954!
803
        pVgroup->cacheUsage = pVload->cacheUsage;
185,388✔
804
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
185,388✔
805
        pVgroup->numOfTables = pVload->numOfTables;
185,388✔
806
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
185,388✔
807
        pVgroup->totalStorage = pVload->totalStorage;
185,388✔
808
        pVgroup->compStorage = pVload->compStorage;
185,388✔
809
        pVgroup->pointsWritten = pVload->pointsWritten;
185,388✔
810
      }
811
      bool stateChanged = false;
243,954✔
812
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
324,033✔
813
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
323,895✔
814
        if (pGid->dnodeId == statusReq.dnodeId) {
323,895✔
815
          if (pVload->startTimeMs == 0) {
243,816!
816
            pVload->startTimeMs = statusReq.rebootTime;
×
817
          }
818
          if (pVload->roleTimeMs == 0) {
243,816!
819
            pVload->roleTimeMs = statusReq.rebootTime;
×
820
          }
821
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
243,816✔
822
          break;
243,816✔
823
        }
824
      }
825
      if (stateChanged) {
243,954✔
826
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
18,295✔
827
        if (pDb != NULL && pDb->stateTs != curMs) {
18,295✔
828
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
11,709!
829
                pDb->stateTs, curMs);
830
          pDb->stateTs = curMs;
11,709✔
831
        }
832
        mndReleaseDb(pMnode, pDb);
18,295✔
833
      }
834
    }
835

836
    mndReleaseVgroup(pMnode, pVgroup);
246,826✔
837
  }
838

839
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
107,108✔
840
  if (pObj != NULL) {
107,108✔
841
    if (statusReq.mload.roleTimeMs == 0) {
64,844✔
842
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
2,277✔
843
    }
844
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
64,844✔
845
    mndReleaseMnode(pMnode, pObj);
64,844✔
846
  }
847

848
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
107,108✔
849
  if (pQnode != NULL) {
107,108✔
850
    pQnode->load = statusReq.qload;
15,305✔
851
    mndReleaseQnode(pMnode, pQnode);
15,305✔
852
  }
853

854
  if (needCheck) {
107,108✔
855
    if (statusReq.sver != tsVersion) {
12,245!
856
      if (pDnode != NULL) {
×
857
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
858
      }
859
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
860
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
861
      goto _OVER;
×
862
    }
863

864
    if (statusReq.dnodeId == 0) {
12,245✔
865
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
3,064!
866
    } else {
867
      if (statusReq.clusterId != pMnode->clusterId) {
9,181!
868
        if (pDnode != NULL) {
×
869
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
870
        }
871
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
872
               pMnode->clusterId);
873
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
874
        goto _OVER;
×
875
      }
876
    }
877

878
    // Verify whether the cluster parameters are consistent when status change from offline to ready
879
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
12,245✔
880
    if (pDnode->offlineReason != 0) {
12,245!
881
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
882
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
883
      goto _OVER;
×
884
    }
885

886
    if (!online) {
12,245✔
887
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
3,371!
888
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
889
    } else {
890
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
8,874!
891
            statusReq.dnodeVer, dnodeVer, reboot);
892
    }
893

894
    pDnode->rebootTime = statusReq.rebootTime;
12,245✔
895
    pDnode->numOfCores = statusReq.numOfCores;
12,245✔
896
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
12,245✔
897
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
12,245✔
898
    pDnode->memAvail = statusReq.memAvail;
12,245✔
899
    pDnode->memTotal = statusReq.memTotal;
12,245✔
900
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
12,245✔
901
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
12,245✔
902
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
12,245✔
903
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
5,269✔
904
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
5,269!
905
        goto _OVER;
×
906
      }
907
    }
908

909
    SStatusRsp statusRsp = {0};
12,245✔
910
    statusRsp.statusSeq++;
12,245✔
911
    statusRsp.analVer = analVer;
12,245✔
912
    statusRsp.dnodeVer = dnodeVer;
12,245✔
913
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
12,245✔
914
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
12,245✔
915
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
12,245✔
916
    if (statusRsp.pDnodeEps == NULL) {
12,245!
917
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
918
      goto _OVER;
×
919
    }
920

921
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
12,245✔
922
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
12,245✔
923

924
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
12,245✔
925
    void   *pHead = rpcMallocCont(contLen);
12,245✔
926
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
12,245✔
927
    taosArrayDestroy(statusRsp.pDnodeEps);
12,245✔
928
    if (contLen < 0) {
12,245!
929
      code = contLen;
×
930
      goto _OVER;
×
931
    }
932

933
    pReq->info.rspLen = contLen;
12,245✔
934
    pReq->info.rsp = pHead;
12,245✔
935
  }
936

937
  pDnode->accessTimes++;
107,108✔
938
  pDnode->lastAccessTime = curMs;
107,108✔
939
  code = 0;
107,108✔
940

941
_OVER:
108,733✔
942
  mndReleaseDnode(pMnode, pDnode);
108,733✔
943
  taosArrayDestroy(statusReq.pVloads);
108,733✔
944
  return mndUpdClusterInfo(pReq);
108,733✔
945
}
946

947
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
948
  SMnode    *pMnode = pReq->info.node;
×
949
  SNotifyReq notifyReq = {0};
×
950
  int32_t    code = 0;
×
951

952
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
953
    terrno = code;
×
954
    goto _OVER;
×
955
  }
956

957
  int64_t clusterid = mndGetClusterId(pMnode);
×
958
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
959
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
960
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
961
          notifyReq.clusterId, clusterid, tstrerror(code));
962
    goto _OVER;
×
963
  }
964

965
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
966
  for (int32_t v = 0; v < nVgroup; ++v) {
×
967
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
968

969
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
970
    if (pVgroup != NULL) {
×
971
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
972
      mndReleaseVgroup(pMnode, pVgroup);
×
973
    }
974
  }
975
  code = mndUpdClusterInfo(pReq);
×
976
_OVER:
×
977
  tFreeSNotifyReq(&notifyReq);
×
978
  return code;
×
979
}
980

981
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
603✔
982
  int32_t  code = -1;
603✔
983
  SSdbRaw *pRaw = NULL;
603✔
984
  STrans  *pTrans = NULL;
603✔
985

986
  SDnodeObj dnodeObj = {0};
603✔
987
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
603✔
988
  dnodeObj.createdTime = taosGetTimestampMs();
603✔
989
  dnodeObj.updateTime = dnodeObj.createdTime;
603✔
990
  dnodeObj.port = pCreate->port;
603✔
991
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
603✔
992
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
603✔
993

994
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
603✔
995
  if (pTrans == NULL) {
603!
996
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
997
    if (terrno != 0) code = terrno;
×
998
    goto _OVER;
×
999
  }
1000
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
603!
1001
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
603!
1002

1003
  pRaw = mndDnodeActionEncode(&dnodeObj);
603✔
1004
  if (pRaw == NULL) {
603!
1005
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1006
    if (terrno != 0) code = terrno;
×
1007
    goto _OVER;
×
1008
  }
1009
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
603!
1010
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
603!
1011
  pRaw = NULL;
603✔
1012

1013
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
603!
1014
  code = 0;
603✔
1015

1016
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
603✔
1017
                                   1);  // TODO: check the return value
1018
_OVER:
603✔
1019
  mndTransDrop(pTrans);
603✔
1020
  sdbFreeRaw(pRaw);
603✔
1021
  return code;
603✔
1022
}
1023

1024
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
210✔
1025
  SMnode       *pMnode = pReq->info.node;
210✔
1026
  SSdb         *pSdb = pMnode->pSdb;
210✔
1027
  SDnodeObj    *pObj = NULL;
210✔
1028
  void         *pIter = NULL;
210✔
1029
  SDnodeListRsp rsp = {0};
210✔
1030
  int32_t       code = -1;
210✔
1031

1032
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
210✔
1033
  if (NULL == rsp.dnodeList) {
210!
1034
    mError("failed to alloc epSet while process dnode list req");
×
1035
    code = terrno;
×
1036
    goto _OVER;
×
1037
  }
1038

1039
  while (1) {
542✔
1040
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
752✔
1041
    if (pIter == NULL) break;
752✔
1042

1043
    SDNodeAddr dnodeAddr = {0};
542✔
1044
    dnodeAddr.nodeId = pObj->id;
542✔
1045
    dnodeAddr.epSet.numOfEps = 1;
542✔
1046
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
542✔
1047
    dnodeAddr.epSet.eps[0].port = pObj->port;
542✔
1048

1049
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
1,084!
1050
      if (terrno != 0) code = terrno;
×
1051
      sdbRelease(pSdb, pObj);
×
1052
      sdbCancelFetch(pSdb, pIter);
×
1053
      goto _OVER;
×
1054
    }
1055

1056
    sdbRelease(pSdb, pObj);
542✔
1057
  }
1058

1059
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
210✔
1060
  void   *pRsp = rpcMallocCont(rspLen);
210✔
1061
  if (pRsp == NULL) {
210!
1062
    code = terrno;
×
1063
    goto _OVER;
×
1064
  }
1065

1066
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
210!
1067
    code = rspLen;
×
1068
    goto _OVER;
×
1069
  }
1070

1071
  pReq->info.rspLen = rspLen;
210✔
1072
  pReq->info.rsp = pRsp;
210✔
1073
  code = 0;
210✔
1074

1075
_OVER:
210✔
1076

1077
  if (code != 0) {
210!
1078
    mError("failed to get dnode list since %s", tstrerror(code));
×
1079
  }
1080

1081
  tFreeSDnodeListRsp(&rsp);
210✔
1082

1083
  TAOS_RETURN(code);
210✔
1084
}
1085

1086
void getSlowLogScopeString(int32_t scope, char *result) {
7✔
1087
  if (scope == SLOW_LOG_TYPE_NULL) {
7!
1088
    (void)strncat(result, "NONE", 64);
×
1089
    return;
×
1090
  }
1091
  while (scope > 0) {
14✔
1092
    if (scope & SLOW_LOG_TYPE_QUERY) {
7!
1093
      (void)strncat(result, "QUERY", 64);
7✔
1094
      scope &= ~SLOW_LOG_TYPE_QUERY;
7✔
1095
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1096
      (void)strncat(result, "INSERT", 64);
×
1097
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1098
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1099
      (void)strncat(result, "OTHERS", 64);
×
1100
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1101
    } else {
1102
      (void)printf("invalid slow log scope:%d", scope);
×
1103
      return;
×
1104
    }
1105

1106
    if (scope > 0) {
7!
1107
      (void)strncat(result, "|", 64);
×
1108
    }
1109
  }
1110
}
1111

1112
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
604✔
1113
  SMnode         *pMnode = pReq->info.node;
604✔
1114
  int32_t         code = -1;
604✔
1115
  SDnodeObj      *pDnode = NULL;
604✔
1116
  SCreateDnodeReq createReq = {0};
604✔
1117
  int32_t         lino = 0;
604✔
1118

1119
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
604!
1120
    goto _OVER;
×
1121
  }
1122

1123
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
604✔
1124
  TAOS_CHECK_GOTO(code, &lino, _OVER);
604!
1125

1126
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
604!
1127
  code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE);
604✔
1128
  TAOS_CHECK_GOTO(code, &lino, _OVER);
604✔
1129

1130
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
603!
1131
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1132
    goto _OVER;
×
1133
  }
1134
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1135
  // if (code != 0) {
1136
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1137
  //          tstrerror(code));
1138
  //   goto _OVER;
1139
  // }
1140

1141
  char ep[TSDB_EP_LEN];
1142
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
603✔
1143
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
603✔
1144
  if (pDnode != NULL) {
603!
1145
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1146
    goto _OVER;
×
1147
  }
1148

1149
  code = mndCreateDnode(pMnode, pReq, &createReq);
603✔
1150
  if (code == 0) {
603!
1151
    code = TSDB_CODE_ACTION_IN_PROGRESS;
603✔
1152
    tsGrantHBInterval = 5;
603✔
1153
  }
1154

1155
  char obj[200] = {0};
603✔
1156
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
603✔
1157

1158
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
603✔
1159

1160
_OVER:
604✔
1161
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
604!
1162
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
1!
1163
  }
1164

1165
  mndReleaseDnode(pMnode, pDnode);
604✔
1166
  tFreeSCreateDnodeReq(&createReq);
604✔
1167
  TAOS_RETURN(code);
604✔
1168
}
1169

1170
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1171

1172
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
10✔
1173

1174
#ifndef TD_ENTERPRISE
1175
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1176
#endif
1177

1178
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
48✔
1179
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1180
  int32_t  code = -1;
48✔
1181
  SSdbRaw *pRaw = NULL;
48✔
1182
  STrans  *pTrans = NULL;
48✔
1183
  int32_t  lino = 0;
48✔
1184

1185
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
48✔
1186
  if (pTrans == NULL) {
48!
1187
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1188
    if (terrno != 0) code = terrno;
×
1189
    goto _OVER;
×
1190
  }
1191
  mndTransSetGroupParallel(pTrans);
48✔
1192
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
48!
1193
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
48!
1194

1195
  pRaw = mndDnodeActionEncode(pDnode);
48✔
1196
  if (pRaw == NULL) {
48!
1197
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1198
    if (terrno != 0) code = terrno;
×
1199
    goto _OVER;
×
1200
  }
1201
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
48!
1202
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
48!
1203
  pRaw = NULL;
48✔
1204

1205
  pRaw = mndDnodeActionEncode(pDnode);
48✔
1206
  if (pRaw == NULL) {
48!
1207
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1208
    if (terrno != 0) code = terrno;
×
1209
    goto _OVER;
×
1210
  }
1211
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
48!
1212
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
48!
1213
  pRaw = NULL;
48✔
1214

1215
  if (pMObj != NULL) {
48✔
1216
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
7!
1217
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
7!
1218
  }
1219

1220
  if (pQObj != NULL) {
48✔
1221
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
6!
1222
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
6!
1223
  }
1224

1225
  if (pSObj != NULL) {
48✔
1226
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
6!
1227
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), &lino, _OVER);
6!
1228
  }
1229

1230
  if (pBObj != NULL) {
48✔
1231
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1232
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
3!
1233
  }
1234

1235
  if (numOfVnodes > 0) {
45✔
1236
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
27!
1237
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
27✔
1238
  }
1239

1240
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
42!
1241

1242
  code = 0;
42✔
1243

1244
_OVER:
48✔
1245
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
48!
1246
  mndTransDrop(pTrans);
48✔
1247
  sdbFreeRaw(pRaw);
48✔
1248
  TAOS_RETURN(code);
48✔
1249
}
1250

1251
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1252
  bool       isEmpty = false;
×
1253
  SMnodeObj *pMObj = NULL;
×
1254
  SQnodeObj *pQObj = NULL;
×
1255
  SSnodeObj *pSObj = NULL;
×
1256

1257
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1258
  if (pQObj) goto _OVER;
×
1259

1260
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1261
  if (pSObj) goto _OVER;
×
1262

1263
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1264
  if (pMObj) goto _OVER;
×
1265

1266
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1267
  if (numOfVnodes > 0) goto _OVER;
×
1268

1269
  isEmpty = true;
×
1270
_OVER:
×
1271
  mndReleaseMnode(pMnode, pMObj);
×
1272
  mndReleaseQnode(pMnode, pQObj);
×
1273
  mndReleaseSnode(pMnode, pSObj);
×
1274
  return isEmpty;
×
1275
}
1276

1277
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
60✔
1278
  SMnode       *pMnode = pReq->info.node;
60✔
1279
  int32_t       code = -1;
60✔
1280
  SDnodeObj    *pDnode = NULL;
60✔
1281
  SMnodeObj    *pMObj = NULL;
60✔
1282
  SQnodeObj    *pQObj = NULL;
60✔
1283
  SSnodeObj    *pSObj = NULL;
60✔
1284
  SBnodeObj    *pBObj = NULL;
60✔
1285
  SDropDnodeReq dropReq = {0};
60✔
1286

1287
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
60!
1288

1289
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
60!
1290
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1291
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
60✔
1292

1293
  bool force = dropReq.force;
59✔
1294
  if (dropReq.unsafe) {
59✔
1295
    force = true;
2✔
1296
  }
1297

1298
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
59✔
1299
  if (pDnode == NULL) {
59✔
1300
    int32_t err = terrno;
1✔
1301
    char    ep[TSDB_EP_LEN + 1] = {0};
1✔
1302
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
1✔
1303
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
1✔
1304
    if (pDnode == NULL) {
1!
1305
      code = err;
1✔
1306
      goto _OVER;
1✔
1307
    }
1308
  }
1309

1310
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
58✔
1311
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
58✔
1312
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
58✔
1313
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
58✔
1314
  if (pMObj != NULL) {
58✔
1315
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
15✔
1316
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
3✔
1317
      goto _OVER;
3✔
1318
    }
1319
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
12✔
1320
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
3✔
1321
      goto _OVER;
3✔
1322
    }
1323
  }
1324

1325
#ifdef USE_MOUNT
1326
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
52!
1327
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1328
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1329
    goto _OVER;
×
1330
  }
1331
#endif
1332

1333
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
52✔
1334
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
52✔
1335

1336
  if (isonline && force) {
52!
1337
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1338
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1339
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1340
    goto _OVER;
×
1341
  }
1342

1343
  bool    vnodeOffline = false;
52✔
1344
  void   *pIter = NULL;
52✔
1345
  int32_t vgId = -1;
52✔
1346
  while (1) {
67✔
1347
    SVgObj *pVgroup = NULL;
119✔
1348
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
119✔
1349
    if (pIter == NULL) break;
119✔
1350

1351
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
222✔
1352
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
155✔
1353
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
59✔
1354
          vgId = pVgroup->vgId;
8✔
1355
          vnodeOffline = true;
8✔
1356
          break;
8✔
1357
        }
1358
      }
1359
    }
1360

1361
    sdbRelease(pMnode->pSdb, pVgroup);
75✔
1362

1363
    if (vnodeOffline) {
75✔
1364
      sdbCancelFetch(pMnode->pSdb, pIter);
8✔
1365
      break;
8✔
1366
    }
1367
  }
1368

1369
  if (vnodeOffline && !force) {
52✔
1370
    code = TSDB_CODE_VND_VNODE_OFFLINE;
4✔
1371
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
4!
1372
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1373
    goto _OVER;
4✔
1374
  }
1375

1376
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
48✔
1377
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
48✔
1378

1379
  char obj1[30] = {0};
48✔
1380
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
48✔
1381

1382
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
48✔
1383

1384
_OVER:
60✔
1385
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
60!
1386
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
18!
1387
  }
1388

1389
  mndReleaseDnode(pMnode, pDnode);
60✔
1390
  mndReleaseMnode(pMnode, pMObj);
60✔
1391
  mndReleaseQnode(pMnode, pQObj);
60✔
1392
  mndReleaseBnode(pMnode, pBObj);
60✔
1393
  mndReleaseSnode(pMnode, pSObj);
60✔
1394
  tFreeSDropDnodeReq(&dropReq);
60✔
1395
  TAOS_RETURN(code);
60✔
1396
}
1397

1398
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
2✔
1399
  int32_t code = 0;
2✔
1400
  SMnode *pMnode = pReq->info.node;
2✔
1401
  SSdb   *pSdb = pMnode->pSdb;
2✔
1402
  void   *pIter = NULL;
2✔
1403
  int8_t  encrypting = 0;
2✔
1404

1405
  const STraceId *trace = &pReq->info.traceId;
2✔
1406

1407
  int32_t klen = strlen(pDcfgReq->value);
2✔
1408
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
2!
1409
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1410
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1411
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1412
    goto _exit;
×
1413
  }
1414

1415
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
2!
1416
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1417
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1418
    goto _exit;
×
1419
  }
1420

1421
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
2!
1422
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1423
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1424
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1425
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1426
    goto _exit;
×
1427
  }
1428

1429
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
2✔
1430
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
2✔
1431
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
2✔
1432

1433
  while (1) {
2✔
1434
    SDnodeObj *pDnode = NULL;
4✔
1435
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
4✔
1436
    if (pIter == NULL) break;
4✔
1437
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
2!
1438
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1439
             offlineReason[pDnode->offlineReason]);
1440
      sdbRelease(pSdb, pDnode);
×
1441
      continue;
×
1442
    }
1443

1444
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
2!
1445
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
2✔
1446
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
2✔
1447
      void   *pBuf = rpcMallocCont(bufLen);
2✔
1448

1449
      if (pBuf != NULL) {
2!
1450
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
2!
1451
          code = bufLen;
×
1452
          sdbRelease(pSdb, pDnode);
×
1453
          goto _exit;
×
1454
        }
1455
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
2✔
1456
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
2!
1457
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
2✔
1458
        }
1459
      }
1460
    }
1461

1462
    sdbRelease(pSdb, pDnode);
2✔
1463
  }
1464

1465
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
2!
1466
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1467
  }
1468

1469
_exit:
2✔
1470
  if (code != 0) {
2!
1471
    if (terrno == 0) terrno = code;
×
1472
  }
1473
  return code;
2✔
1474
}
1475

1476
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
2✔
1477
  int32_t code = 0;
2✔
1478

1479
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1480
  SMnode       *pMnode = pReq->info.node;
2✔
1481
  SMCfgDnodeReq cfgReq = {0};
2✔
1482
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
2!
1483

1484
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
2!
1485
    tFreeSMCfgDnodeReq(&cfgReq);
×
1486
    TAOS_RETURN(code);
×
1487
  }
1488
  const STraceId *trace = &pReq->info.traceId;
2✔
1489
  SDCfgDnodeReq   dcfgReq = {0};
2✔
1490
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
2!
1491
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
2✔
1492
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
2✔
1493
    tFreeSMCfgDnodeReq(&cfgReq);
2✔
1494
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
2✔
1495
  } else {
1496
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1497
    tFreeSMCfgDnodeReq(&cfgReq);
×
1498
    TAOS_RETURN(code);
×
1499
  }
1500

1501
#else
1502
  TAOS_RETURN(code);
1503
#endif
1504
}
1505

1506
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
2✔
1507
  SMnode *pMnode = pRsp->info.node;
2✔
1508
  int16_t nSuccess = 0;
2✔
1509
  int16_t nFailed = 0;
2✔
1510

1511
  if (0 == pRsp->code) {
2!
1512
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
2✔
1513
  } else {
1514
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1515
  }
1516

1517
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
2✔
1518
  bool    finished = nSuccess + nFailed >= nReq;
2✔
1519

1520
  if (finished) {
2!
1521
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
2✔
1522
  }
1523

1524
  const STraceId *trace = &pRsp->info.traceId;
2✔
1525
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
2!
1526
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1527

1528
  return 0;
2✔
1529
}
1530

1531
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7✔
1532
  SMnode *pMnode = pReq->info.node;
7✔
1533
  int32_t totalRows = 0;
7✔
1534
  int32_t numOfRows = 0;
7✔
1535
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
7✔
1536
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
7✔
1537
  char   *pWrite = NULL;
7✔
1538
  int32_t cols = 0;
7✔
1539
  int32_t code = 0;
7✔
1540
  int32_t lino = 0;
7✔
1541

1542
  cfgOpts[totalRows] = "statusInterval";
7✔
1543
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
7✔
1544
  totalRows++;
7✔
1545

1546
  cfgOpts[totalRows] = "timezone";
7✔
1547
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
7✔
1548
  totalRows++;
7✔
1549

1550
  cfgOpts[totalRows] = "locale";
7✔
1551
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
7✔
1552
  totalRows++;
7✔
1553

1554
  cfgOpts[totalRows] = "charset";
7✔
1555
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
7✔
1556
  totalRows++;
7✔
1557

1558
  cfgOpts[totalRows] = "monitor";
7✔
1559
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
7✔
1560
  totalRows++;
7✔
1561

1562
  cfgOpts[totalRows] = "monitorInterval";
7✔
1563
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
7✔
1564
  totalRows++;
7✔
1565

1566
  cfgOpts[totalRows] = "slowLogThreshold";
7✔
1567
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
7✔
1568
  totalRows++;
7✔
1569

1570
  cfgOpts[totalRows] = "slowLogMaxLen";
7✔
1571
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
7✔
1572
  totalRows++;
7✔
1573

1574
  char scopeStr[64] = {0};
7✔
1575
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
7✔
1576
  cfgOpts[totalRows] = "slowLogScope";
7✔
1577
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
7✔
1578
  totalRows++;
7✔
1579

1580
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1581
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1582

1583
  for (int32_t i = 0; i < totalRows; i++) {
70✔
1584
    cols = 0;
63✔
1585

1586
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
63✔
1587
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1588
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
63!
1589

1590
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
63✔
1591
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1592
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
63!
1593

1594
    numOfRows++;
63✔
1595
  }
1596

1597
_OVER:
7✔
1598
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
7!
1599
  pShow->numOfRows += numOfRows;
7✔
1600
  return numOfRows;
7✔
1601
}
1602

1603
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1604

1605
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,920✔
1606
  SMnode    *pMnode = pReq->info.node;
1,920✔
1607
  SSdb      *pSdb = pMnode->pSdb;
1,920✔
1608
  int32_t    numOfRows = 0;
1,920✔
1609
  int32_t    cols = 0;
1,920✔
1610
  ESdbStatus objStatus = 0;
1,920✔
1611
  SDnodeObj *pDnode = NULL;
1,920✔
1612
  int64_t    curMs = taosGetTimestampMs();
1,920✔
1613
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1614
  int32_t    code = 0;
1,920✔
1615
  int32_t    lino = 0;
1,920✔
1616

1617
  while (numOfRows < rows) {
8,352!
1618
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
8,352✔
1619
    if (pShow->pIter == NULL) break;
8,352✔
1620
    bool online = mndIsDnodeOnline(pDnode, curMs);
6,432✔
1621

1622
    cols = 0;
6,432✔
1623

1624
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,432✔
1625
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
6,432!
1626

1627
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
6,432✔
1628

1629
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,432✔
1630
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
6,432!
1631

1632
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,432✔
1633
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
6,432✔
1634
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
6,432!
1635

1636
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,432✔
1637
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
6,432!
1638
                        &lino, _OVER);
1639

1640
    const char *status = "ready";
6,432✔
1641
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
6,432!
1642
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
6,432!
1643
    if (!online) {
6,432✔
1644
      if (objStatus == SDB_STATUS_CREATING)
1,087!
1645
        status = "creating*";
×
1646
      else if (objStatus == SDB_STATUS_DROPPING)
1,087!
1647
        status = "dropping*";
×
1648
      else
1649
        status = "offline";
1,087✔
1650
    }
1651

1652
    STR_TO_VARSTR(buf, status);
6,432✔
1653
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,432✔
1654
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
6,432!
1655

1656
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,432✔
1657
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
6,432!
1658
                        _OVER);
1659

1660
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,432✔
1661
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
6,432!
1662
                        _OVER);
1663

1664
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
6,432!
1665
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
6,432✔
1666

1667
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,432✔
1668
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
6,432!
1669
    taosMemoryFreeClear(b);
6,432!
1670

1671
#ifdef TD_ENTERPRISE
1672
    STR_TO_VARSTR(buf, pDnode->machineId);
6,432✔
1673
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,432✔
1674
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
6,432!
1675
#endif
1676

1677
    numOfRows++;
6,432✔
1678
    sdbRelease(pSdb, pDnode);
6,432✔
1679
  }
1680

1681
_OVER:
×
1682
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
1,920!
1683

1684
  pShow->numOfRows += numOfRows;
1,920✔
1685
  return numOfRows;
1,920✔
1686
}
1687

1688
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1689
  SSdb *pSdb = pMnode->pSdb;
×
1690
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1691
}
×
1692

1693
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
4,109✔
1694
  SDnodeObj *pObj = NULL;
4,109✔
1695
  void      *pIter = NULL;
4,109✔
1696
  SSdb      *pSdb = pMnode->pSdb;
4,109✔
1697
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
4,109✔
1698
  while (1) {
3,769✔
1699
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
7,878✔
1700
    if (pIter == NULL) break;
7,878✔
1701

1702
    char *fqdn = taosStrdup(pObj->fqdn);
3,769!
1703
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
3,769!
1704
      mError("failed to fqdn into array, but continue at this time");
×
1705
    }
1706
    sdbRelease(pSdb, pObj);
3,769✔
1707
  }
1708
  return fqdns;
4,109✔
1709
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc