• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.74
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "taos_monitor.h"
33
#include "tconfig.h"
34
#include "tjson.h"
35
#include "tmisce.h"
36
#include "tunit.h"
37

38
#define TSDB_DNODE_VER_NUMBER   2
39
#define TSDB_DNODE_RESERVE_SIZE 40
40

41
static const char *offlineReason[] = {
42
    "",
43
    "status msg timeout",
44
    "status not received",
45
    "version not match",
46
    "dnodeId not match",
47
    "clusterId not match",
48
    "statusInterval not match",
49
    "timezone not match",
50
    "locale not match",
51
    "charset not match",
52
    "ttlChangeOnWrite not match",
53
    "enableWhiteList not match",
54
    "encryptionKey not match",
55
    "monitor not match",
56
    "monitor switch not match",
57
    "monitor interval not match",
58
    "monitor slow log threshold not match",
59
    "monitor slow log sql max len not match",
60
    "monitor slow log scope not match",
61
    "unknown",
62
};
63

64
enum {
65
  DND_ACTIVE_CODE,
66
  DND_CONN_ACTIVE_CODE,
67
};
68

69
enum {
70
  DND_CREATE,
71
  DND_ADD,
72
  DND_DROP,
73
};
74

75
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
76
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
77
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
78
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
79
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
80
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
81
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
82

83
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
91
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
92
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
93

94
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
96
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
98

99
#ifdef _GRANT
100
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
101
#else
102
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
103
#endif
104

105
int32_t mndInitDnode(SMnode *pMnode) {
1,929✔
106
  SSdbTable table = {
1,929✔
107
      .sdbType = SDB_DNODE,
108
      .keyType = SDB_KEY_INT32,
109
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
110
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
111
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
112
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
113
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
114
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
115
  };
116

117
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
1,929✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
1,929✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
1,929✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
1,929✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
1,929✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
1,929✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
1,929✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
1,929✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
1,929✔
126
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
1,929✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
1,929✔
128

129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
1,929✔
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
1,929✔
131
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
1,929✔
132
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
1,929✔
133

134
  return sdbSetTable(pMnode->pSdb, table);
1,929✔
135
}
136

137
void mndCleanupDnode(SMnode *pMnode) {}
1,929✔
138

139
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
1,426✔
140
  int32_t  code = -1;
1,426✔
141
  SSdbRaw *pRaw = NULL;
1,426✔
142
  STrans  *pTrans = NULL;
1,426✔
143

144
  SDnodeObj dnodeObj = {0};
1,426✔
145
  dnodeObj.id = 1;
1,426✔
146
  dnodeObj.createdTime = taosGetTimestampMs();
1,426✔
147
  dnodeObj.updateTime = dnodeObj.createdTime;
1,426✔
148
  dnodeObj.port = tsServerPort;
1,426✔
149
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
1,426✔
150
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
1,426✔
151
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
1,426✔
152
  char *machineId = NULL;
1,426✔
153
  code = tGetMachineId(&machineId);
1,426✔
154
  if (machineId) {
1,426!
155
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
1,426✔
156
    taosMemoryFreeClear(machineId);
1,426!
157
  } else {
158
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
159
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
160
    goto _OVER;
×
161
#endif
162
  }
163

164
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
1,426✔
165
  if (pTrans == NULL) {
1,426!
166
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
167
    if (terrno != 0) code = terrno;
×
168
    goto _OVER;
×
169
  }
170
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
1,426!
171

172
  pRaw = mndDnodeActionEncode(&dnodeObj);
1,426✔
173
  if (pRaw == NULL) {
1,426!
174
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
175
    if (terrno != 0) code = terrno;
×
176
    goto _OVER;
×
177
  }
178
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
1,426!
179
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
1,426!
180
  pRaw = NULL;
1,426✔
181

182
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,426!
183
  code = 0;
1,426✔
184
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
1,426✔
185
                                   1);  // TODO: check the return value
186

187
_OVER:
1,426✔
188
  mndTransDrop(pTrans);
1,426✔
189
  sdbFreeRaw(pRaw);
1,426✔
190
  return code;
1,426✔
191
}
192

193
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
11,221✔
194
  int32_t code = 0;
11,221✔
195
  int32_t lino = 0;
11,221✔
196
  terrno = TSDB_CODE_OUT_OF_MEMORY;
11,221✔
197

198
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
11,221✔
199
  if (pRaw == NULL) goto _OVER;
11,221!
200

201
  int32_t dataPos = 0;
11,221✔
202
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
11,221!
203
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
11,221!
204
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
11,221!
205
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
11,221!
206
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
11,221!
207
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
11,221!
208
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
11,221!
209
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
11,221!
210
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
11,221!
211
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
11,221!
212

213
  terrno = 0;
11,221✔
214

215
_OVER:
11,221✔
216
  if (terrno != 0) {
11,221!
217
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
218
    sdbFreeRaw(pRaw);
×
219
    return NULL;
×
220
  }
221

222
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
11,221✔
223
  return pRaw;
11,221✔
224
}
225

226
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
8,688✔
227
  int32_t code = 0;
8,688✔
228
  int32_t lino = 0;
8,688✔
229
  terrno = TSDB_CODE_OUT_OF_MEMORY;
8,688✔
230
  SSdbRow   *pRow = NULL;
8,688✔
231
  SDnodeObj *pDnode = NULL;
8,688✔
232

233
  int8_t sver = 0;
8,688✔
234
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
8,688!
235
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
8,688!
236
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
237
    goto _OVER;
×
238
  }
239

240
  pRow = sdbAllocRow(sizeof(SDnodeObj));
8,688✔
241
  if (pRow == NULL) goto _OVER;
8,688!
242

243
  pDnode = sdbGetRowObj(pRow);
8,688✔
244
  if (pDnode == NULL) goto _OVER;
8,688!
245

246
  int32_t dataPos = 0;
8,688✔
247
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
8,688!
248
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
8,688!
249
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
8,688!
250
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
8,688!
251
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
8,688!
252
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
8,688!
253
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
8,688!
254
  if (sver > 1) {
8,688!
255
    int16_t keyLen = 0;
8,688✔
256
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
8,688!
257
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
8,688!
258
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
8,688!
259
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
8,688!
260
  }
261

262
  terrno = 0;
8,688✔
263
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
8,688!
264
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
265
  }
266

267
_OVER:
8,688✔
268
  if (terrno != 0) {
8,688!
269
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
270
    taosMemoryFreeClear(pRow);
×
271
    return NULL;
×
272
  }
273

274
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
8,688✔
275
  return pRow;
8,688✔
276
}
277

278
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
3,656✔
279
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
3,656✔
280
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
3,656✔
281

282
  char ep[TSDB_EP_LEN] = {0};
3,656✔
283
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
3,656✔
284
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
3,656✔
285
  return 0;
3,656✔
286
}
287

288
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
8,688✔
289
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
8,688✔
290
  return 0;
8,688✔
291
}
292

293
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
5,006✔
294
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
5,006✔
295
  pOld->updateTime = pNew->updateTime;
5,006✔
296
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
297
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
5,006✔
298
#endif
299
  return 0;
5,006✔
300
}
301

302
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
458,579✔
303
  SSdb      *pSdb = pMnode->pSdb;
458,579✔
304
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
458,579✔
305
  if (pDnode == NULL) {
458,580✔
306
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
576✔
307
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
198✔
308
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
378!
309
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
310
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
378!
311
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
378✔
312
    } else {
313
      terrno = TSDB_CODE_APP_ERROR;
×
314
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
315
    }
316
  }
317

318
  return pDnode;
458,580✔
319
}
320

321
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
462,713✔
322
  SSdb *pSdb = pMnode->pSdb;
462,713✔
323
  sdbRelease(pSdb, pDnode);
462,713✔
324
}
462,714✔
325

326
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
29,629✔
327
  SEpSet epSet = {0};
29,629✔
328
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
29,629✔
329
  return epSet;
29,629✔
330
}
331

332
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
2,069✔
333
  SEpSet     epSet = {0};
2,069✔
334
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
2,069✔
335
  if (!pDnode) return epSet;
2,069!
336

337
  epSet = mndGetDnodeEpset(pDnode);
2,069✔
338

339
  mndReleaseDnode(pMnode, pDnode);
2,069✔
340
  return epSet;
2,069✔
341
}
342

343
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
4,388✔
344
  SSdb *pSdb = pMnode->pSdb;
4,388✔
345

346
  void *pIter = NULL;
4,388✔
347
  while (1) {
7,414✔
348
    SDnodeObj *pDnode = NULL;
11,802✔
349
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
11,802✔
350
    if (pIter == NULL) break;
11,802✔
351

352
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
9,804✔
353
      sdbCancelFetch(pSdb, pIter);
2,390✔
354
      return pDnode;
2,390✔
355
    }
356

357
    sdbRelease(pSdb, pDnode);
7,414✔
358
  }
359

360
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
1,998✔
361
  return NULL;
1,998✔
362
}
363

364
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
284✔
365
  SSdb *pSdb = pMnode->pSdb;
284✔
366

367
  void *pIter = NULL;
284✔
368
  while (1) {
323✔
369
    SDnodeObj *pDnode = NULL;
607✔
370
    ESdbStatus objStatus = 0;
607✔
371
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
607✔
372
    if (pIter == NULL) break;
607!
373

374
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
607✔
375
      sdbCancelFetch(pSdb, pIter);
284✔
376
      return pDnode;
284✔
377
    }
378

379
    sdbRelease(pSdb, pDnode);
323✔
380
  }
381

382
  return NULL;
×
383
}
384

385
int32_t mndGetDnodeSize(SMnode *pMnode) {
182,069✔
386
  SSdb *pSdb = pMnode->pSdb;
182,069✔
387
  return sdbGetSize(pSdb, SDB_DNODE);
182,069✔
388
}
389

390
int32_t mndGetDbSize(SMnode *pMnode) {
×
391
  SSdb *pSdb = pMnode->pSdb;
×
392
  return sdbGetSize(pSdb, SDB_DB);
×
393
}
394

395
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
214,131✔
396
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
214,131✔
397
  if (interval > (int64_t)tsStatusTimeoutMs) {
214,131✔
398
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
6,967✔
399
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
88✔
400
    }
401
    return false;
6,967✔
402
  }
403
  return true;
207,164✔
404
}
405

406
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
9,603✔
407
  SSdb *pSdb = pMnode->pSdb;
9,603✔
408

409
  int32_t numOfEps = 0;
9,603✔
410
  void   *pIter = NULL;
9,603✔
411
  while (1) {
31,509✔
412
    SDnodeObj *pDnode = NULL;
41,112✔
413
    ESdbStatus objStatus = 0;
41,112✔
414
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
41,112✔
415
    if (pIter == NULL) break;
41,112✔
416

417
    SDnodeEp dnodeEp = {0};
31,509✔
418
    dnodeEp.id = pDnode->id;
31,509✔
419
    dnodeEp.ep.port = pDnode->port;
31,509✔
420
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
31,509✔
421
    sdbRelease(pSdb, pDnode);
31,509✔
422

423
    dnodeEp.isMnode = 0;
31,509✔
424
    if (mndIsMnode(pMnode, pDnode->id)) {
31,509✔
425
      dnodeEp.isMnode = 1;
14,596✔
426
    }
427
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
31,509!
428
      mError("failed to put ep into array, but continue at this call");
×
429
    }
430
  }
431
}
9,603✔
432

433
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
56,462✔
434
  SSdb   *pSdb = pMnode->pSdb;
56,462✔
435
  int32_t code = 0;
56,462✔
436

437
  int32_t numOfEps = 0;
56,462✔
438
  void   *pIter = NULL;
56,462✔
439
  while (1) {
250,538✔
440
    SDnodeObj *pDnode = NULL;
307,000✔
441
    ESdbStatus objStatus = 0;
307,000✔
442
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
307,000✔
443
    if (pIter == NULL) break;
307,000✔
444

445
    SDnodeInfo dInfo;
446
    dInfo.id = pDnode->id;
250,538✔
447
    dInfo.ep.port = pDnode->port;
250,538✔
448
    dInfo.offlineReason = pDnode->offlineReason;
250,538✔
449
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
250,538✔
450
    sdbRelease(pSdb, pDnode);
250,538✔
451
    if (mndIsMnode(pMnode, pDnode->id)) {
250,538✔
452
      dInfo.isMnode = 1;
87,340✔
453
    } else {
454
      dInfo.isMnode = 0;
163,198✔
455
    }
456

457
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
250,538!
458
      code = terrno;
×
459
      sdbCancelFetch(pSdb, pIter);
×
460
      break;
×
461
    }
462
  }
463
  TAOS_RETURN(code);
56,462✔
464
}
465

466
#define CHECK_MONITOR_PARA(para, err)                                                                    \
467
  if (pCfg->monitorParas.para != para) {                                                                 \
468
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
469
    terrno = err;                                                                                        \
470
    return err;                                                                                          \
471
  }
472

473
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
9,603✔
474
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
9,603!
475
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
9,603!
476
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
9,603!
477
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
9,603!
478
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
9,603!
479

480
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
9,603!
481
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
482
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
483
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
484
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
485
  }
486

487
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
9,603!
488
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
×
489
           tsStatusIntervalMs);
490
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
491
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
492
  }
493

494
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
9,603!
495
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
496
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
497
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
498
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
499
  }
500

501
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
9,603!
502
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
503
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
504
    return DND_REASON_LOCALE_NOT_MATCH;
×
505
  }
506

507
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
9,603!
508
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
509
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
510
    return DND_REASON_CHARSET_NOT_MATCH;
×
511
  }
512

513
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
9,603!
514
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
515
           tsTtlChangeOnWrite);
516
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
517
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
518
  }
519
  int8_t enable = tsEnableWhiteList ? 1 : 0;
9,603✔
520
  if (pCfg->enableWhiteList != enable) {
9,603!
521
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
522
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
523
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
524
  }
525

526
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
9,603!
527
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
9,603!
528
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
529
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
530
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
531
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
532
  }
533

534
  return DND_REASON_ONLINE;
9,603✔
535
}
536

537
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
10✔
538
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
10!
539
    return 0.0;
×
540
  }
541

542
  int64_t deltaCount = currentCount - lastCount;
10✔
543
  int64_t deltaMs = currentTimeMs - lastTimeMs;
10✔
544
  double  rate = (double)deltaCount / (double)deltaMs;
10✔
545
  return rate;
10✔
546
}
547

548
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
251,347✔
549
  bool stateChanged = false;
251,347✔
550
  bool roleChanged = pGid->syncState != pVload->syncState ||
749,849✔
551
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
488,643!
552
                     pGid->roleTimeMs != pVload->roleTimeMs;
237,296✔
553

554
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
251,347✔
555
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
104✔
556
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
93✔
557
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
11✔
558
      int64_t currentTimeMs = taosGetTimestampMs();
10✔
559
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
10✔
560
                                          pGid->lastSyncAppliedIndexUpdateTime);
561

562
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
10✔
563
    }
564
  }
565

566
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
251,347✔
567
  pGid->syncCommitIndex = pVload->syncCommitIndex;
251,347✔
568
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
251,347✔
569
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
251,347✔
570
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
251,347!
571
      pGid->startTimeMs != pVload->startTimeMs) {
236,302!
572
    mInfo(
15,045!
573
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
574
        "canRead:%d, dnode:%d",
575
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
576
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
577
    pGid->syncState = pVload->syncState;
15,045✔
578
    pGid->syncTerm = pVload->syncTerm;
15,045✔
579
    pGid->syncRestore = pVload->syncRestore;
15,045✔
580
    pGid->syncCanRead = pVload->syncCanRead;
15,045✔
581
    pGid->startTimeMs = pVload->startTimeMs;
15,045✔
582
    pGid->roleTimeMs = pVload->roleTimeMs;
15,045✔
583
    stateChanged = true;
15,045✔
584
  }
585
  return stateChanged;
251,347✔
586
}
587

588
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
62,703✔
589
  bool stateChanged = false;
62,703✔
590
  bool roleChanged = pObj->syncState != pMload->syncState ||
186,176✔
591
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
123,399!
592
                     pObj->roleTimeMs != pMload->roleTimeMs;
60,696✔
593
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
62,703✔
594
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
2,042!
595
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
596
          pObj->syncTerm, pMload->syncTerm);
597
    pObj->syncState = pMload->syncState;
2,042✔
598
    pObj->syncTerm = pMload->syncTerm;
2,042✔
599
    pObj->syncRestore = pMload->syncRestore;
2,042✔
600
    pObj->roleTimeMs = pMload->roleTimeMs;
2,042✔
601
    stateChanged = true;
2,042✔
602
  }
603
  return stateChanged;
62,703✔
604
}
605

606
extern char   *tsMonFwUri;
607
extern char   *tsMonSlowLogUri;
608
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
×
609
  SMnode    *pMnode = pReq->info.node;
×
610
  SStatisReq statisReq = {0};
×
611
  int32_t    code = -1;
×
612

613
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
×
614

615
  if (tsMonitorLogProtocol) {
×
616
    mInfo("process statis req,\n %s", statisReq.pCont);
×
617
  }
618

619
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
×
620
    monSendContent(statisReq.pCont, tsMonFwUri);
×
621
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
622
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
623
  }
624

625
  tFreeSStatisReq(&statisReq);
×
626
  return 0;
×
627
}
628

629
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
6,968✔
630
  mTrace("process audit req:%p", pReq);
6,968!
631
  if (tsEnableAudit && tsEnableAuditDelete) {
6,968!
632
    SMnode   *pMnode = pReq->info.node;
6,968✔
633
    SAuditReq auditReq = {0};
6,968✔
634

635
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
6,968!
636

637
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
6,968✔
638

639
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
6,968✔
640
                   auditReq.sqlLen);
641

642
    tFreeSAuditReq(&auditReq);
6,968✔
643
  }
644
  return 0;
6,968✔
645
}
646

647
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
3,742✔
648
  int32_t       code = 0, lino = 0;
3,742✔
649
  SDnodeInfoReq infoReq = {0};
3,742✔
650
  int32_t       contLen = 0;
3,742✔
651
  void         *pReq = NULL;
3,742✔
652

653
  infoReq.dnodeId = pDnode->id;
3,742✔
654
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
3,742✔
655

656
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
3,742!
657
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
658
  }
659
  pReq = rpcMallocCont(contLen);
3,742✔
660
  if (pReq == NULL) {
3,742!
661
    TAOS_RETURN(terrno);
×
662
  }
663

664
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
3,742!
665
    code = contLen;
×
666
    goto _exit;
×
667
  }
668

669
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
3,742✔
670
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
3,742!
671
_exit:
3,742✔
672
  if (code < 0) {
3,742!
673
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
674
  }
675
  TAOS_RETURN(code);
3,742✔
676
}
677

678
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
3,742✔
679
  int32_t       code = 0, lino = 0;
3,742✔
680
  SMnode       *pMnode = pReq->info.node;
3,742✔
681
  SDnodeInfoReq infoReq = {0};
3,742✔
682
  SDnodeObj    *pDnode = NULL;
3,742✔
683
  STrans       *pTrans = NULL;
3,742✔
684
  SSdbRaw      *pCommitRaw = NULL;
3,742✔
685

686
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
3,742!
687

688
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
3,742✔
689
  if (pDnode == NULL) {
3,742✔
690
    TAOS_CHECK_EXIT(terrno);
1!
691
  }
692

693
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
3,741✔
694
  if (pTrans == NULL) {
3,741!
695
    TAOS_CHECK_EXIT(terrno);
×
696
  }
697

698
  pDnode->updateTime = taosGetTimestampMs();
3,741✔
699

700
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
3,741!
701
    TAOS_CHECK_EXIT(terrno);
×
702
  }
703
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
3,741!
704
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
705
    TAOS_CHECK_EXIT(code);
×
706
  }
707
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
3,741!
708
  pCommitRaw = NULL;
3,741✔
709

710
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
3,741!
711
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
712
    TAOS_CHECK_EXIT(code);
×
713
  }
714

715
_exit:
3,741✔
716
  mndReleaseDnode(pMnode, pDnode);
3,742✔
717
  if (code != 0) {
3,742✔
718
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
1!
719
  }
720
  mndTransDrop(pTrans);
3,742✔
721
  sdbFreeRaw(pCommitRaw);
3,742✔
722
  TAOS_RETURN(code);
3,742✔
723
}
724

725
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
96,636✔
726
  SMnode    *pMnode = pReq->info.node;
96,636✔
727
  SStatusReq statusReq = {0};
96,636✔
728
  SDnodeObj *pDnode = NULL;
96,636✔
729
  int32_t    code = -1;
96,636✔
730

731
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
96,636!
732

733
  int64_t clusterid = mndGetClusterId(pMnode);
96,636✔
734
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
96,636!
735
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
736
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
737
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
738
    goto _OVER;
×
739
  }
740

741
  if (statusReq.dnodeId == 0) {
96,636✔
742
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
3,450✔
743
    if (pDnode == NULL) {
3,450✔
744
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
1,068!
745
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
1,068✔
746
      if (terrno != 0) code = terrno;
1,068!
747
      goto _OVER;
1,068✔
748
    }
749
  } else {
750
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
93,186✔
751
    if (pDnode == NULL) {
93,186✔
752
      int32_t err = terrno;
452✔
753
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
452✔
754
      if (pDnode != NULL) {
452✔
755
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
8✔
756
        terrno = err;
8✔
757
        goto _OVER;
8✔
758
      }
759

760
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
444!
761
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
444✔
762
        terrno = err;
160✔
763
        goto _OVER;
160✔
764
      } else {
765
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
284✔
766
        if (pDnode == NULL) goto _OVER;
284!
767
      }
768
    }
769
  }
770

771
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
95,400✔
772

773
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
95,400✔
774
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
95,400✔
775
  int64_t curMs = taosGetTimestampMs();
95,400✔
776
  bool    online = mndIsDnodeOnline(pDnode, curMs);
95,400✔
777
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
95,400✔
778
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
95,400✔
779
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
95,400✔
780
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
95,400✔
781
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
95,400✔
782
  bool    analVerChanged = (analVer != statusReq.analVer);
95,400✔
783
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
92,768!
784
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
188,168!
785
  const STraceId *trace = &pReq->info.traceId;
95,400✔
786
  char            timestamp[TD_TIME_STR_LEN] = {0};
95,400✔
787
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
95,400✔
788
  mGTrace(
95,400!
789
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
790
      "timestamp:%s",
791
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
792

793
  if (reboot) {
95,400✔
794
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
2,729✔
795
  }
796

797
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
349,271✔
798
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
253,871✔
799

800
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
253,871✔
801
    if (pVgroup != NULL) {
253,871✔
802
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
251,409✔
803
        pVgroup->cacheUsage = pVload->cacheUsage;
188,499✔
804
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
188,499✔
805
        pVgroup->numOfTables = pVload->numOfTables;
188,499✔
806
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
188,499✔
807
        pVgroup->totalStorage = pVload->totalStorage;
188,499✔
808
        pVgroup->compStorage = pVload->compStorage;
188,499✔
809
        pVgroup->pointsWritten = pVload->pointsWritten;
188,499✔
810
      }
811
      bool stateChanged = false;
251,409✔
812
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
338,250✔
813
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
338,188✔
814
        if (pGid->dnodeId == statusReq.dnodeId) {
338,188✔
815
          if (pVload->startTimeMs == 0) {
251,347!
816
            pVload->startTimeMs = statusReq.rebootTime;
×
817
          }
818
          if (pVload->roleTimeMs == 0) {
251,347!
819
            pVload->roleTimeMs = statusReq.rebootTime;
×
820
          }
821
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
251,347✔
822
          break;
251,347✔
823
        }
824
      }
825
      if (stateChanged) {
251,409✔
826
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
15,045✔
827
        if (pDb != NULL && pDb->stateTs != curMs) {
15,045✔
828
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
9,409!
829
                pDb->stateTs, curMs);
830
          pDb->stateTs = curMs;
9,409✔
831
        }
832
        mndReleaseDb(pMnode, pDb);
15,045✔
833
      }
834
    }
835

836
    mndReleaseVgroup(pMnode, pVgroup);
253,871✔
837
  }
838

839
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
95,400✔
840
  if (pObj != NULL) {
95,400✔
841
    if (statusReq.mload.roleTimeMs == 0) {
62,703✔
842
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
1,591✔
843
    }
844
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
62,703✔
845
    mndReleaseMnode(pMnode, pObj);
62,703✔
846
  }
847

848
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
95,400✔
849
  if (pQnode != NULL) {
95,400✔
850
    pQnode->load = statusReq.qload;
18,014✔
851
    mndReleaseQnode(pMnode, pQnode);
18,014✔
852
  }
853

854
  if (needCheck) {
95,400✔
855
    if (statusReq.sver != tsVersion) {
9,603!
856
      if (pDnode != NULL) {
×
857
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
858
      }
859
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
860
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
861
      goto _OVER;
×
862
    }
863

864
    if (statusReq.dnodeId == 0) {
9,603✔
865
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
2,382!
866
    } else {
867
      if (statusReq.clusterId != pMnode->clusterId) {
7,221!
868
        if (pDnode != NULL) {
×
869
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
870
        }
871
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
872
               pMnode->clusterId);
873
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
874
        goto _OVER;
×
875
      }
876
    }
877

878
    // Verify whether the cluster parameters are consistent when status change from offline to ready
879
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
9,603✔
880
    if (pDnode->offlineReason != 0) {
9,603!
881
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
882
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
883
      goto _OVER;
×
884
    }
885

886
    if (!online) {
9,603✔
887
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
2,632!
888
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
889
    } else {
890
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
6,971!
891
            statusReq.dnodeVer, dnodeVer, reboot);
892
    }
893

894
    pDnode->rebootTime = statusReq.rebootTime;
9,603✔
895
    pDnode->numOfCores = statusReq.numOfCores;
9,603✔
896
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
9,603✔
897
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
9,603✔
898
    pDnode->memAvail = statusReq.memAvail;
9,603✔
899
    pDnode->memTotal = statusReq.memTotal;
9,603✔
900
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
9,603✔
901
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
9,603✔
902
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
9,603✔
903
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
3,742✔
904
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
3,742!
905
        goto _OVER;
×
906
      }
907
    }
908

909
    SStatusRsp statusRsp = {0};
9,603✔
910
    statusRsp.statusSeq++;
9,603✔
911
    statusRsp.analVer = analVer;
9,603✔
912
    statusRsp.dnodeVer = dnodeVer;
9,603✔
913
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
9,603✔
914
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
9,603✔
915
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
9,603✔
916
    if (statusRsp.pDnodeEps == NULL) {
9,603!
917
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
918
      goto _OVER;
×
919
    }
920

921
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
9,603✔
922
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
9,603✔
923

924
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
9,603✔
925
    void   *pHead = rpcMallocCont(contLen);
9,603✔
926
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
9,603✔
927
    taosArrayDestroy(statusRsp.pDnodeEps);
9,603✔
928
    if (contLen < 0) {
9,603!
929
      code = contLen;
×
930
      goto _OVER;
×
931
    }
932

933
    pReq->info.rspLen = contLen;
9,603✔
934
    pReq->info.rsp = pHead;
9,603✔
935
  }
936

937
  pDnode->accessTimes++;
95,400✔
938
  pDnode->lastAccessTime = curMs;
95,400✔
939
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
95,400!
940
    pDnode->offlineReason = DND_REASON_ONLINE;
1✔
941
  }
942
  code = 0;
95,400✔
943

944
_OVER:
96,636✔
945
  mndReleaseDnode(pMnode, pDnode);
96,636✔
946
  taosArrayDestroy(statusReq.pVloads);
96,636✔
947
  return mndUpdClusterInfo(pReq);
96,636✔
948
}
949

950
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
951
  SMnode    *pMnode = pReq->info.node;
×
952
  SNotifyReq notifyReq = {0};
×
953
  int32_t    code = 0;
×
954

955
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
956
    terrno = code;
×
957
    goto _OVER;
×
958
  }
959

960
  int64_t clusterid = mndGetClusterId(pMnode);
×
961
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
962
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
963
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
964
          notifyReq.clusterId, clusterid, tstrerror(code));
965
    goto _OVER;
×
966
  }
967

968
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
969
  for (int32_t v = 0; v < nVgroup; ++v) {
×
970
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
971

972
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
973
    if (pVgroup != NULL) {
×
974
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
975
      mndReleaseVgroup(pMnode, pVgroup);
×
976
    }
977
  }
978
  code = mndUpdClusterInfo(pReq);
×
979
_OVER:
×
980
  tFreeSNotifyReq(&notifyReq);
×
981
  return code;
×
982
}
983

984
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
486✔
985
  int32_t  code = -1;
486✔
986
  SSdbRaw *pRaw = NULL;
486✔
987
  STrans  *pTrans = NULL;
486✔
988

989
  SDnodeObj dnodeObj = {0};
486✔
990
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
486✔
991
  dnodeObj.createdTime = taosGetTimestampMs();
486✔
992
  dnodeObj.updateTime = dnodeObj.createdTime;
486✔
993
  dnodeObj.port = pCreate->port;
486✔
994
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
486✔
995
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
486✔
996

997
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
486✔
998
  if (pTrans == NULL) {
486!
999
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1000
    if (terrno != 0) code = terrno;
×
1001
    goto _OVER;
×
1002
  }
1003
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
486!
1004
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
486!
1005

1006
  pRaw = mndDnodeActionEncode(&dnodeObj);
486✔
1007
  if (pRaw == NULL) {
486!
1008
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1009
    if (terrno != 0) code = terrno;
×
1010
    goto _OVER;
×
1011
  }
1012
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
486!
1013
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
486!
1014
  pRaw = NULL;
486✔
1015

1016
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
486!
1017
  code = 0;
486✔
1018

1019
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
486✔
1020
                                   1);  // TODO: check the return value
1021
_OVER:
486✔
1022
  mndTransDrop(pTrans);
486✔
1023
  sdbFreeRaw(pRaw);
486✔
1024
  return code;
486✔
1025
}
1026

1027
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
295✔
1028
  SMnode       *pMnode = pReq->info.node;
295✔
1029
  SSdb         *pSdb = pMnode->pSdb;
295✔
1030
  SDnodeObj    *pObj = NULL;
295✔
1031
  void         *pIter = NULL;
295✔
1032
  SDnodeListRsp rsp = {0};
295✔
1033
  int32_t       code = -1;
295✔
1034

1035
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
295✔
1036
  if (NULL == rsp.dnodeList) {
295!
1037
    mError("failed to alloc epSet while process dnode list req");
×
1038
    code = terrno;
×
1039
    goto _OVER;
×
1040
  }
1041

1042
  while (1) {
616✔
1043
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
911✔
1044
    if (pIter == NULL) break;
911✔
1045

1046
    SDNodeAddr dnodeAddr = {0};
616✔
1047
    dnodeAddr.nodeId = pObj->id;
616✔
1048
    dnodeAddr.epSet.numOfEps = 1;
616✔
1049
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
616✔
1050
    dnodeAddr.epSet.eps[0].port = pObj->port;
616✔
1051

1052
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
1,232!
1053
      if (terrno != 0) code = terrno;
×
1054
      sdbRelease(pSdb, pObj);
×
1055
      sdbCancelFetch(pSdb, pIter);
×
1056
      goto _OVER;
×
1057
    }
1058

1059
    sdbRelease(pSdb, pObj);
616✔
1060
  }
1061

1062
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
295✔
1063
  void   *pRsp = rpcMallocCont(rspLen);
295✔
1064
  if (pRsp == NULL) {
295!
1065
    code = terrno;
×
1066
    goto _OVER;
×
1067
  }
1068

1069
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
295!
1070
    code = rspLen;
×
1071
    goto _OVER;
×
1072
  }
1073

1074
  pReq->info.rspLen = rspLen;
295✔
1075
  pReq->info.rsp = pRsp;
295✔
1076
  code = 0;
295✔
1077

1078
_OVER:
295✔
1079

1080
  if (code != 0) {
295!
1081
    mError("failed to get dnode list since %s", tstrerror(code));
×
1082
  }
1083

1084
  tFreeSDnodeListRsp(&rsp);
295✔
1085

1086
  TAOS_RETURN(code);
295✔
1087
}
1088

1089
void getSlowLogScopeString(int32_t scope, char *result) {
7✔
1090
  if (scope == SLOW_LOG_TYPE_NULL) {
7!
1091
    (void)strncat(result, "NONE", 64);
×
1092
    return;
×
1093
  }
1094
  while (scope > 0) {
14✔
1095
    if (scope & SLOW_LOG_TYPE_QUERY) {
7!
1096
      (void)strncat(result, "QUERY", 64);
7✔
1097
      scope &= ~SLOW_LOG_TYPE_QUERY;
7✔
1098
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1099
      (void)strncat(result, "INSERT", 64);
×
1100
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1101
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1102
      (void)strncat(result, "OTHERS", 64);
×
1103
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1104
    } else {
1105
      (void)printf("invalid slow log scope:%d", scope);
×
1106
      return;
×
1107
    }
1108

1109
    if (scope > 0) {
7!
1110
      (void)strncat(result, "|", 64);
×
1111
    }
1112
  }
1113
}
1114

1115
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
487✔
1116
  SMnode         *pMnode = pReq->info.node;
487✔
1117
  int32_t         code = -1;
487✔
1118
  SDnodeObj      *pDnode = NULL;
487✔
1119
  SCreateDnodeReq createReq = {0};
487✔
1120
  int32_t         lino = 0;
487✔
1121

1122
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
487!
1123
    goto _OVER;
×
1124
  }
1125

1126
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
487✔
1127
  TAOS_CHECK_GOTO(code, &lino, _OVER);
487!
1128

1129
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
487!
1130
  code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE);
487✔
1131
  TAOS_CHECK_GOTO(code, &lino, _OVER);
487✔
1132

1133
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
486!
1134
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1135
    goto _OVER;
×
1136
  }
1137
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1138
  // if (code != 0) {
1139
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1140
  //          tstrerror(code));
1141
  //   goto _OVER;
1142
  // }
1143

1144
  char ep[TSDB_EP_LEN];
1145
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
486✔
1146
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
486✔
1147
  if (pDnode != NULL) {
486!
1148
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1149
    goto _OVER;
×
1150
  }
1151

1152
  code = mndCreateDnode(pMnode, pReq, &createReq);
486✔
1153
  if (code == 0) {
486!
1154
    code = TSDB_CODE_ACTION_IN_PROGRESS;
486✔
1155
    tsGrantHBInterval = 5;
486✔
1156
  }
1157

1158
  char obj[200] = {0};
486✔
1159
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
486✔
1160

1161
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
486✔
1162

1163
_OVER:
487✔
1164
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
487!
1165
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
1!
1166
  }
1167

1168
  mndReleaseDnode(pMnode, pDnode);
487✔
1169
  tFreeSCreateDnodeReq(&createReq);
487✔
1170
  TAOS_RETURN(code);
487✔
1171
}
1172

1173
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1174

1175
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
6✔
1176

1177
#ifndef TD_ENTERPRISE
1178
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1179
#endif
1180

1181
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
26✔
1182
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1183
  int32_t  code = -1;
26✔
1184
  SSdbRaw *pRaw = NULL;
26✔
1185
  STrans  *pTrans = NULL;
26✔
1186
  int32_t  lino = 0;
26✔
1187

1188
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
26✔
1189
  if (pTrans == NULL) {
26!
1190
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1191
    if (terrno != 0) code = terrno;
×
1192
    goto _OVER;
×
1193
  }
1194
  mndTransSetGroupParallel(pTrans);
26✔
1195
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
26!
1196
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
26!
1197

1198
  pRaw = mndDnodeActionEncode(pDnode);
26✔
1199
  if (pRaw == NULL) {
26!
1200
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1201
    if (terrno != 0) code = terrno;
×
1202
    goto _OVER;
×
1203
  }
1204
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
26!
1205
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
26!
1206
  pRaw = NULL;
26✔
1207

1208
  pRaw = mndDnodeActionEncode(pDnode);
26✔
1209
  if (pRaw == NULL) {
26!
1210
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1211
    if (terrno != 0) code = terrno;
×
1212
    goto _OVER;
×
1213
  }
1214
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
26!
1215
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
26!
1216
  pRaw = NULL;
26✔
1217

1218
  if (pSObj != NULL) {
26✔
1219
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1220
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
3!
1221
  }
1222

1223
  if (pMObj != NULL) {
26✔
1224
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
2!
1225
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
2!
1226
  }
1227

1228
  if (pQObj != NULL) {
26✔
1229
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1!
1230
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
1!
1231
  }
1232

1233
  if (pBObj != NULL) {
26✔
1234
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1235
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
3!
1236
  }
1237

1238
  if (numOfVnodes > 0) {
23✔
1239
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
14!
1240
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
14!
1241
  }
1242

1243
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
23!
1244

1245
  code = 0;
23✔
1246

1247
_OVER:
26✔
1248
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
26!
1249
  mndTransDrop(pTrans);
26✔
1250
  sdbFreeRaw(pRaw);
26✔
1251
  TAOS_RETURN(code);
26✔
1252
}
1253

1254
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1255
  bool       isEmpty = false;
×
1256
  SMnodeObj *pMObj = NULL;
×
1257
  SQnodeObj *pQObj = NULL;
×
1258
  SSnodeObj *pSObj = NULL;
×
1259

1260
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1261
  if (pQObj) goto _OVER;
×
1262

1263
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1264
  if (pSObj) goto _OVER;
×
1265

1266
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1267
  if (pMObj) goto _OVER;
×
1268

1269
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1270
  if (numOfVnodes > 0) goto _OVER;
×
1271

1272
  isEmpty = true;
×
1273
_OVER:
×
1274
  mndReleaseMnode(pMnode, pMObj);
×
1275
  mndReleaseQnode(pMnode, pQObj);
×
1276
  mndReleaseSnode(pMnode, pSObj);
×
1277
  return isEmpty;
×
1278
}
1279

1280
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
32✔
1281
  SMnode       *pMnode = pReq->info.node;
32✔
1282
  int32_t       code = -1;
32✔
1283
  SDnodeObj    *pDnode = NULL;
32✔
1284
  SMnodeObj    *pMObj = NULL;
32✔
1285
  SQnodeObj    *pQObj = NULL;
32✔
1286
  SSnodeObj    *pSObj = NULL;
32✔
1287
  SBnodeObj    *pBObj = NULL;
32✔
1288
  SDropDnodeReq dropReq = {0};
32✔
1289

1290
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
32!
1291

1292
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
32!
1293
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1294
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
32✔
1295

1296
  bool force = dropReq.force;
31✔
1297
  if (dropReq.unsafe) {
31!
1298
    force = true;
×
1299
  }
1300

1301
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
31✔
1302
  if (pDnode == NULL) {
31!
1303
    int32_t err = terrno;
×
1304
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1305
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1306
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1307
    if (pDnode == NULL) {
×
1308
      code = err;
×
1309
      goto _OVER;
×
1310
    }
1311
  }
1312

1313
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
31✔
1314
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
31✔
1315
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
31✔
1316
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
31✔
1317
  if (pMObj != NULL) {
31✔
1318
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
7✔
1319
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
3✔
1320
      goto _OVER;
3✔
1321
    }
1322
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
4✔
1323
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
2✔
1324
      goto _OVER;
2✔
1325
    }
1326
  }
1327

1328
#ifdef USE_MOUNT
1329
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
26!
1330
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1331
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1332
    goto _OVER;
×
1333
  }
1334
#endif
1335

1336
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
26✔
1337
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
26✔
1338

1339
  if (isonline && force) {
26!
1340
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1341
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1342
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1343
    goto _OVER;
×
1344
  }
1345

1346
  mError("vnode num:%d", numOfVnodes);
26!
1347

1348
  bool    vnodeOffline = false;
26✔
1349
  void   *pIter = NULL;
26✔
1350
  int32_t vgId = -1;
26✔
1351
  while (1) {
39✔
1352
    SVgObj *pVgroup = NULL;
65✔
1353
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
65✔
1354
    if (pIter == NULL) break;
65✔
1355

1356
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
120✔
1357
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
81!
1358
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
81✔
1359
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
29!
1360
          vgId = pVgroup->vgId;
×
1361
          vnodeOffline = true;
×
1362
          break;
×
1363
        }
1364
      }
1365
    }
1366

1367
    sdbRelease(pMnode->pSdb, pVgroup);
39✔
1368

1369
    if (vnodeOffline) {
39!
1370
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1371
      break;
×
1372
    }
1373
  }
1374

1375
  if (vnodeOffline && !force) {
26!
1376
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1377
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1378
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1379
    goto _OVER;
×
1380
  }
1381

1382
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
26✔
1383
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
26✔
1384

1385
  char obj1[30] = {0};
26✔
1386
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
26✔
1387

1388
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
26✔
1389

1390
_OVER:
32✔
1391
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
32!
1392
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
9!
1393
  }
1394

1395
  mndReleaseDnode(pMnode, pDnode);
32✔
1396
  mndReleaseMnode(pMnode, pMObj);
32✔
1397
  mndReleaseQnode(pMnode, pQObj);
32✔
1398
  mndReleaseBnode(pMnode, pBObj);
32✔
1399
  mndReleaseSnode(pMnode, pSObj);
32✔
1400
  tFreeSDropDnodeReq(&dropReq);
32✔
1401
  TAOS_RETURN(code);
32✔
1402
}
1403

1404
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
3✔
1405
  int32_t code = 0;
3✔
1406
  SMnode *pMnode = pReq->info.node;
3✔
1407
  SSdb   *pSdb = pMnode->pSdb;
3✔
1408
  void   *pIter = NULL;
3✔
1409
  int8_t  encrypting = 0;
3✔
1410

1411
  const STraceId *trace = &pReq->info.traceId;
3✔
1412

1413
  int32_t klen = strlen(pDcfgReq->value);
3✔
1414
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
3!
1415
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1416
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1417
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1418
    goto _exit;
×
1419
  }
1420

1421
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
3!
1422
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1423
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1424
    goto _exit;
×
1425
  }
1426

1427
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
3!
1428
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1429
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1430
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1431
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1432
    goto _exit;
×
1433
  }
1434

1435
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
3✔
1436
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
3✔
1437
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
3✔
1438

1439
  while (1) {
5✔
1440
    SDnodeObj *pDnode = NULL;
8✔
1441
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
8✔
1442
    if (pIter == NULL) break;
8✔
1443
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
5!
1444
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1445
             offlineReason[pDnode->offlineReason]);
1446
      sdbRelease(pSdb, pDnode);
×
1447
      continue;
×
1448
    }
1449

1450
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
5!
1451
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
5✔
1452
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
5✔
1453
      void   *pBuf = rpcMallocCont(bufLen);
5✔
1454

1455
      if (pBuf != NULL) {
5!
1456
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
5!
1457
          code = bufLen;
×
1458
          sdbRelease(pSdb, pDnode);
×
1459
          goto _exit;
×
1460
        }
1461
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
5✔
1462
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
5!
1463
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
5✔
1464
        }
1465
      }
1466
    }
1467

1468
    sdbRelease(pSdb, pDnode);
5✔
1469
  }
1470

1471
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
3!
1472
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1473
  }
1474

1475
_exit:
3✔
1476
  if (code != 0) {
3!
1477
    if (terrno == 0) terrno = code;
×
1478
  }
1479
  return code;
3✔
1480
}
1481

1482
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
3✔
1483
  int32_t code = 0;
3✔
1484

1485
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1486
  SMnode       *pMnode = pReq->info.node;
3✔
1487
  SMCfgDnodeReq cfgReq = {0};
3✔
1488
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
3!
1489

1490
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
3!
1491
    tFreeSMCfgDnodeReq(&cfgReq);
×
1492
    TAOS_RETURN(code);
×
1493
  }
1494
  const STraceId *trace = &pReq->info.traceId;
3✔
1495
  SDCfgDnodeReq   dcfgReq = {0};
3✔
1496
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
3!
1497
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
3✔
1498
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
3✔
1499
    tFreeSMCfgDnodeReq(&cfgReq);
3✔
1500
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
3✔
1501
  } else {
1502
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1503
    tFreeSMCfgDnodeReq(&cfgReq);
×
1504
    TAOS_RETURN(code);
×
1505
  }
1506

1507
#else
1508
  TAOS_RETURN(code);
1509
#endif
1510
}
1511

1512
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
5✔
1513
  SMnode *pMnode = pRsp->info.node;
5✔
1514
  int16_t nSuccess = 0;
5✔
1515
  int16_t nFailed = 0;
5✔
1516

1517
  if (0 == pRsp->code) {
5!
1518
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
5✔
1519
  } else {
1520
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1521
  }
1522

1523
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
5✔
1524
  bool    finished = nSuccess + nFailed >= nReq;
5✔
1525

1526
  if (finished) {
5✔
1527
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
3✔
1528
  }
1529

1530
  const STraceId *trace = &pRsp->info.traceId;
5✔
1531
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
5!
1532
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1533

1534
  return 0;
5✔
1535
}
1536

1537
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7✔
1538
  SMnode *pMnode = pReq->info.node;
7✔
1539
  int32_t totalRows = 0;
7✔
1540
  int32_t numOfRows = 0;
7✔
1541
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
7✔
1542
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
7✔
1543
  char   *pWrite = NULL;
7✔
1544
  int32_t cols = 0;
7✔
1545
  int32_t code = 0;
7✔
1546
  int32_t lino = 0;
7✔
1547

1548
  cfgOpts[totalRows] = "statusIntervalMs";
7✔
1549
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
7✔
1550
  totalRows++;
7✔
1551

1552
  cfgOpts[totalRows] = "timezone";
7✔
1553
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
7✔
1554
  totalRows++;
7✔
1555

1556
  cfgOpts[totalRows] = "locale";
7✔
1557
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
7✔
1558
  totalRows++;
7✔
1559

1560
  cfgOpts[totalRows] = "charset";
7✔
1561
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
7✔
1562
  totalRows++;
7✔
1563

1564
  cfgOpts[totalRows] = "monitor";
7✔
1565
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
7✔
1566
  totalRows++;
7✔
1567

1568
  cfgOpts[totalRows] = "monitorInterval";
7✔
1569
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
7✔
1570
  totalRows++;
7✔
1571

1572
  cfgOpts[totalRows] = "slowLogThreshold";
7✔
1573
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
7✔
1574
  totalRows++;
7✔
1575

1576
  cfgOpts[totalRows] = "slowLogMaxLen";
7✔
1577
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
7✔
1578
  totalRows++;
7✔
1579

1580
  char scopeStr[64] = {0};
7✔
1581
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
7✔
1582
  cfgOpts[totalRows] = "slowLogScope";
7✔
1583
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
7✔
1584
  totalRows++;
7✔
1585

1586
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1587
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1588

1589
  for (int32_t i = 0; i < totalRows; i++) {
70✔
1590
    cols = 0;
63✔
1591

1592
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
63✔
1593
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1594
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
63!
1595

1596
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
63✔
1597
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1598
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
63!
1599

1600
    numOfRows++;
63✔
1601
  }
1602

1603
_OVER:
7✔
1604
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
7!
1605
  pShow->numOfRows += numOfRows;
7✔
1606
  return numOfRows;
7✔
1607
}
1608

1609
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1610

1611
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
2,477✔
1612
  SMnode    *pMnode = pReq->info.node;
2,477✔
1613
  SSdb      *pSdb = pMnode->pSdb;
2,477✔
1614
  int32_t    numOfRows = 0;
2,477✔
1615
  int32_t    cols = 0;
2,477✔
1616
  ESdbStatus objStatus = 0;
2,477✔
1617
  SDnodeObj *pDnode = NULL;
2,477✔
1618
  int64_t    curMs = taosGetTimestampMs();
2,477✔
1619
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1620
  int32_t    code = 0;
2,477✔
1621
  int32_t    lino = 0;
2,477✔
1622

1623
  while (numOfRows < rows) {
7,943!
1624
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
7,943✔
1625
    if (pShow->pIter == NULL) break;
7,943✔
1626
    bool online = mndIsDnodeOnline(pDnode, curMs);
5,466✔
1627

1628
    cols = 0;
5,466✔
1629

1630
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,466✔
1631
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
5,466!
1632

1633
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
5,466✔
1634

1635
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,466✔
1636
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
5,466!
1637

1638
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,466✔
1639
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
5,466✔
1640
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
5,466!
1641

1642
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,466✔
1643
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
5,466!
1644
                        &lino, _OVER);
1645

1646
    const char *status = "ready";
5,466✔
1647
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
5,466!
1648
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
5,466!
1649
    if (!online) {
5,466✔
1650
      if (objStatus == SDB_STATUS_CREATING)
455!
1651
        status = "creating*";
×
1652
      else if (objStatus == SDB_STATUS_DROPPING)
455!
1653
        status = "dropping*";
×
1654
      else
1655
        status = "offline";
455✔
1656
    }
1657

1658
    STR_TO_VARSTR(buf, status);
5,466✔
1659
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,466✔
1660
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
5,466!
1661

1662
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,466✔
1663
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
5,466!
1664
                        _OVER);
1665

1666
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,466✔
1667
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
5,466!
1668
                        _OVER);
1669

1670
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
5,466!
1671
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
5,466✔
1672

1673
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,466✔
1674
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
5,466!
1675
    taosMemoryFreeClear(b);
5,466!
1676

1677
#ifdef TD_ENTERPRISE
1678
    STR_TO_VARSTR(buf, pDnode->machineId);
5,466✔
1679
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,466✔
1680
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
5,466!
1681
#endif
1682

1683
    numOfRows++;
5,466✔
1684
    sdbRelease(pSdb, pDnode);
5,466✔
1685
  }
1686

1687
_OVER:
×
1688
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
2,477!
1689

1690
  pShow->numOfRows += numOfRows;
2,477✔
1691
  return numOfRows;
2,477✔
1692
}
1693

1694
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1695
  SSdb *pSdb = pMnode->pSdb;
×
1696
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1697
}
×
1698

1699
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
4,504✔
1700
  SDnodeObj *pObj = NULL;
4,504✔
1701
  void      *pIter = NULL;
4,504✔
1702
  SSdb      *pSdb = pMnode->pSdb;
4,504✔
1703
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
4,504✔
1704
  while (1) {
4,524✔
1705
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
9,028✔
1706
    if (pIter == NULL) break;
9,028✔
1707

1708
    char *fqdn = taosStrdup(pObj->fqdn);
4,524!
1709
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
4,524!
1710
      mError("failed to fqdn into array, but continue at this time");
×
1711
    }
1712
    sdbRelease(pSdb, pObj);
4,524✔
1713
  }
1714
  return fqdns;
4,504✔
1715
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc