• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4404

30 Jun 2025 02:45AM UTC coverage: 62.241% (-0.4%) from 62.635%
#4404

push

travis-ci

web-flow
Merge pull request #31480 from taosdata/docs/3.0/TD-34215

add stmt2 docs

153837 of 315978 branches covered (48.69%)

Branch coverage included in aggregate %.

238272 of 314005 relevant lines covered (75.88%)

6134648.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.64
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndCluster.h"
21
#include "mndDb.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "mndBnode.h"
31
#include "taos_monitor.h"
32
#include "tconfig.h"
33
#include "tjson.h"
34
#include "tmisce.h"
35
#include "tunit.h"
36

37
#define TSDB_DNODE_VER_NUMBER   2
38
#define TSDB_DNODE_RESERVE_SIZE 40
39

40
static const char *offlineReason[] = {
41
    "",
42
    "status msg timeout",
43
    "status not received",
44
    "version not match",
45
    "dnodeId not match",
46
    "clusterId not match",
47
    "statusInterval not match",
48
    "timezone not match",
49
    "locale not match",
50
    "charset not match",
51
    "ttlChangeOnWrite not match",
52
    "enableWhiteList not match",
53
    "encryptionKey not match",
54
    "monitor not match",
55
    "monitor switch not match",
56
    "monitor interval not match",
57
    "monitor slow log threshold not match",
58
    "monitor slow log sql max len not match",
59
    "monitor slow log scope not match",
60
    "unknown",
61
};
62

63
enum {
64
  DND_ACTIVE_CODE,
65
  DND_CONN_ACTIVE_CODE,
66
};
67

68
enum {
69
  DND_CREATE,
70
  DND_ADD,
71
  DND_DROP,
72
};
73

74
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
75
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
76
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
77
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
78
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
79
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
80
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
81

82
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
85
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
86
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
87
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
88
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
89
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
90
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
91
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
92

93
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
94
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
95
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
96
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
97

98
#ifdef _GRANT
99
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
100
#else
101
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
102
#endif
103

104
int32_t mndInitDnode(SMnode *pMnode) {
2,162✔
105
  SSdbTable table = {
2,162✔
106
      .sdbType = SDB_DNODE,
107
      .keyType = SDB_KEY_INT32,
108
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
109
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
110
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
111
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
112
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
113
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
114
  };
115

116
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
2,162✔
117
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
2,162✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
2,162✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
2,162✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
2,162✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
2,162✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
2,162✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
2,162✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
2,162✔
125
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
2,162✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
2,162✔
127

128
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
2,162✔
129
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
2,162✔
130
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
2,162✔
131
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
2,162✔
132

133
  return sdbSetTable(pMnode->pSdb, table);
2,162✔
134
}
135

136
void          mndCleanupDnode(SMnode *pMnode) {}
2,161✔
137

138
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
1,624✔
139
  int32_t  code = -1;
1,624✔
140
  SSdbRaw *pRaw = NULL;
1,624✔
141
  STrans  *pTrans = NULL;
1,624✔
142

143
  SDnodeObj dnodeObj = {0};
1,624✔
144
  dnodeObj.id = 1;
1,624✔
145
  dnodeObj.createdTime = taosGetTimestampMs();
1,624✔
146
  dnodeObj.updateTime = dnodeObj.createdTime;
1,624✔
147
  dnodeObj.port = tsServerPort;
1,624✔
148
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
1,624✔
149
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
1,624✔
150
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
1,624✔
151
  char *machineId = NULL;
1,624✔
152
  code = tGetMachineId(&machineId);
1,624✔
153
  if (machineId) {
1,624!
154
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
1,624✔
155
    taosMemoryFreeClear(machineId);
1,624!
156
  } else {
157
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
158
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
159
    goto _OVER;
×
160
#endif
161
  }
162

163
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
1,624✔
164
  if (pTrans == NULL) {
1,624!
165
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
166
    if (terrno != 0) code = terrno;
×
167
    goto _OVER;
×
168
  }
169
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
1,624!
170

171
  pRaw = mndDnodeActionEncode(&dnodeObj);
1,624✔
172
  if (pRaw == NULL) {
1,624!
173
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
174
    if (terrno != 0) code = terrno;
×
175
    goto _OVER;
×
176
  }
177
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
1,624!
178
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
1,624!
179
  pRaw = NULL;
1,624✔
180

181
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,624!
182
  code = 0;
1,624✔
183
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
1,624✔
184
                                   1);  // TODO: check the return value
185

186
_OVER:
1,624✔
187
  mndTransDrop(pTrans);
1,624✔
188
  sdbFreeRaw(pRaw);
1,624✔
189
  return code;
1,624✔
190
}
191

192
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
12,489✔
193
  int32_t code = 0;
12,489✔
194
  int32_t lino = 0;
12,489✔
195
  terrno = TSDB_CODE_OUT_OF_MEMORY;
12,489✔
196

197
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
12,489✔
198
  if (pRaw == NULL) goto _OVER;
12,489!
199

200
  int32_t dataPos = 0;
12,489✔
201
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
12,489!
202
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
12,489!
203
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
12,489!
204
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
12,489!
205
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
12,489!
206
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
12,489!
207
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
12,489!
208
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
12,489!
209
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
12,489!
210
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
12,489!
211

212
  terrno = 0;
12,489✔
213

214
_OVER:
12,489✔
215
  if (terrno != 0) {
12,489!
216
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
217
    sdbFreeRaw(pRaw);
×
218
    return NULL;
×
219
  }
220

221
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
12,489✔
222
  return pRaw;
12,489✔
223
}
224

225
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
9,730✔
226
  int32_t code = 0;
9,730✔
227
  int32_t lino = 0;
9,730✔
228
  terrno = TSDB_CODE_OUT_OF_MEMORY;
9,730✔
229
  SSdbRow   *pRow = NULL;
9,730✔
230
  SDnodeObj *pDnode = NULL;
9,730✔
231

232
  int8_t sver = 0;
9,730✔
233
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
9,730!
234
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
9,730!
235
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
236
    goto _OVER;
×
237
  }
238

239
  pRow = sdbAllocRow(sizeof(SDnodeObj));
9,730✔
240
  if (pRow == NULL) goto _OVER;
9,730!
241

242
  pDnode = sdbGetRowObj(pRow);
9,730✔
243
  if (pDnode == NULL) goto _OVER;
9,730!
244

245
  int32_t dataPos = 0;
9,730✔
246
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
9,730!
247
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
9,730!
248
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
9,730!
249
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
9,730!
250
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
9,730!
251
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
9,730!
252
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
9,730!
253
  if (sver > 1) {
9,730!
254
    int16_t keyLen = 0;
9,730✔
255
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
9,730!
256
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
9,730!
257
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
9,730!
258
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
9,730!
259
  }
260

261
  terrno = 0;
9,730✔
262
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
9,730!
263
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
264
  }
265

266
_OVER:
9,730✔
267
  if (terrno != 0) {
9,730!
268
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
269
    taosMemoryFreeClear(pRow);
×
270
    return NULL;
×
271
  }
272

273
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
9,730✔
274
  return pRow;
9,730✔
275
}
276

277
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
3,928✔
278
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
3,928✔
279
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
3,928✔
280

281
  char ep[TSDB_EP_LEN] = {0};
3,928✔
282
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
3,928✔
283
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
3,928✔
284
  return 0;
3,928✔
285
}
286

287
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
9,729✔
288
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
9,729✔
289
  return 0;
9,729✔
290
}
291

292
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
5,773✔
293
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
5,773✔
294
  pOld->updateTime = pNew->updateTime;
5,773✔
295
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
296
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
5,773✔
297
#endif
298
  return 0;
5,773✔
299
}
300

301
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
601,215✔
302
  SSdb      *pSdb = pMnode->pSdb;
601,215✔
303
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
601,215✔
304
  if (pDnode == NULL) {
601,220✔
305
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
461✔
306
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
119✔
307
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
342!
308
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
309
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
342!
310
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
342✔
311
    } else {
312
      terrno = TSDB_CODE_APP_ERROR;
×
313
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
314
    }
315
  }
316

317
  return pDnode;
601,220✔
318
}
319

320
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
604,917✔
321
  SSdb *pSdb = pMnode->pSdb;
604,917✔
322
  sdbRelease(pSdb, pDnode);
604,917✔
323
}
604,922✔
324

325
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
27,070✔
326
  SEpSet epSet = {0};
27,070✔
327
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
27,070✔
328
  return epSet;
27,070✔
329
}
330

331
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
332
  SEpSet     epSet = {0};
×
333
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
334
  if (!pDnode) return epSet;
×
335

336
  epSet = mndGetDnodeEpset(pDnode);
×
337

338
  mndReleaseDnode(pMnode, pDnode);
×
339
  return epSet;
×
340
}
341

342
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
4,427✔
343
  SSdb *pSdb = pMnode->pSdb;
4,427✔
344

345
  void *pIter = NULL;
4,427✔
346
  while (1) {
5,010✔
347
    SDnodeObj *pDnode = NULL;
9,437✔
348
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
9,437✔
349
    if (pIter == NULL) break;
9,437✔
350

351
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
7,589✔
352
      sdbCancelFetch(pSdb, pIter);
2,579✔
353
      return pDnode;
2,579✔
354
    }
355

356
    sdbRelease(pSdb, pDnode);
5,010✔
357
  }
358

359
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
1,848✔
360
  return NULL;
1,848✔
361
}
362

363
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
248✔
364
  SSdb *pSdb = pMnode->pSdb;
248✔
365

366
  void *pIter = NULL;
248✔
367
  while (1) {
248✔
368
    SDnodeObj *pDnode = NULL;
496✔
369
    ESdbStatus objStatus = 0;
496✔
370
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
496✔
371
    if (pIter == NULL) break;
496!
372

373
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
496✔
374
      sdbCancelFetch(pSdb, pIter);
248✔
375
      return pDnode;
248✔
376
    }
377

378
    sdbRelease(pSdb, pDnode);
248✔
379
  }
380

381
  return NULL;
×
382
}
383

384
int32_t mndGetDnodeSize(SMnode *pMnode) {
190,263✔
385
  SSdb *pSdb = pMnode->pSdb;
190,263✔
386
  return sdbGetSize(pSdb, SDB_DNODE);
190,263✔
387
}
388

389
int32_t mndGetDbSize(SMnode *pMnode) {
×
390
  SSdb *pSdb = pMnode->pSdb;
×
391
  return sdbGetSize(pSdb, SDB_DB);
×
392
}
393

394
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
202,069✔
395
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
202,069✔
396
  if (interval > 5000 * (int64_t)tsStatusInterval) {
202,069✔
397
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
9,489✔
398
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
199✔
399
    }
400
    return false;
9,489✔
401
  }
402
  return true;
192,580✔
403
}
404

405
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
10,459✔
406
  SSdb *pSdb = pMnode->pSdb;
10,459✔
407

408
  int32_t numOfEps = 0;
10,459✔
409
  void   *pIter = NULL;
10,459✔
410
  while (1) {
30,768✔
411
    SDnodeObj *pDnode = NULL;
41,227✔
412
    ESdbStatus objStatus = 0;
41,227✔
413
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
41,227✔
414
    if (pIter == NULL) break;
41,227✔
415

416
    SDnodeEp dnodeEp = {0};
30,768✔
417
    dnodeEp.id = pDnode->id;
30,768✔
418
    dnodeEp.ep.port = pDnode->port;
30,768✔
419
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
30,768✔
420
    sdbRelease(pSdb, pDnode);
30,768✔
421

422
    dnodeEp.isMnode = 0;
30,768✔
423
    if (mndIsMnode(pMnode, pDnode->id)) {
30,768✔
424
      dnodeEp.isMnode = 1;
14,674✔
425
    }
426
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
30,768!
427
      mError("failed to put ep into array, but continue at this call");
×
428
    }
429
  }
430
}
10,459✔
431

432
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
51,731✔
433
  SSdb   *pSdb = pMnode->pSdb;
51,731✔
434
  int32_t code = 0;
51,731✔
435

436
  int32_t numOfEps = 0;
51,731✔
437
  void   *pIter = NULL;
51,731✔
438
  while (1) {
225,040✔
439
    SDnodeObj *pDnode = NULL;
276,771✔
440
    ESdbStatus objStatus = 0;
276,771✔
441
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
276,771✔
442
    if (pIter == NULL) break;
276,771✔
443

444
    SDnodeInfo dInfo;
445
    dInfo.id = pDnode->id;
225,040✔
446
    dInfo.ep.port = pDnode->port;
225,040✔
447
    dInfo.offlineReason = pDnode->offlineReason;
225,040✔
448
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
225,040✔
449
    sdbRelease(pSdb, pDnode);
225,040✔
450
    if (mndIsMnode(pMnode, pDnode->id)) {
225,040✔
451
      dInfo.isMnode = 1;
73,666✔
452
    } else {
453
      dInfo.isMnode = 0;
151,374✔
454
    }
455

456
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
225,040!
457
      code = terrno;
×
458
      sdbCancelFetch(pSdb, pIter);
×
459
      break;
×
460
    }
461
  }
462
  TAOS_RETURN(code);
51,731✔
463
}
464

465
#define CHECK_MONITOR_PARA(para, err)                                                                    \
466
  if (pCfg->monitorParas.para != para) {                                                                 \
467
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
468
    terrno = err;                                                                                        \
469
    return err;                                                                                          \
470
  }
471

472
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
10,459✔
473
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
10,459!
474
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
10,459!
475
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
10,459!
476
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
10,459!
477
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
10,459!
478

479
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
10,459!
480
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
481
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
482
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
483
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
484
  }
485

486
  if (pCfg->statusInterval != tsStatusInterval) {
10,459!
487
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
488
           tsStatusInterval);
489
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
490
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
491
  }
492

493
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
10,459!
494
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
495
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
496
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
497
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
498
  }
499

500
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
10,459!
501
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
502
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
503
    return DND_REASON_LOCALE_NOT_MATCH;
×
504
  }
505

506
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
10,459!
507
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
508
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
509
    return DND_REASON_CHARSET_NOT_MATCH;
×
510
  }
511

512
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
10,459!
513
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
514
           tsTtlChangeOnWrite);
515
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
516
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
517
  }
518
  int8_t enable = tsEnableWhiteList ? 1 : 0;
10,459✔
519
  if (pCfg->enableWhiteList != enable) {
10,459!
520
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
521
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
522
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
523
  }
524

525
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
10,459!
526
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
10,459!
527
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
528
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
529
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
530
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
531
  }
532

533
  return DND_REASON_ONLINE;
10,459✔
534
}
535

536
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
148✔
537
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
148!
538
    return 0.0;
25✔
539
  }
540

541
  int64_t deltaCount = currentCount - lastCount;
123✔
542
  int64_t deltaMs = currentTimeMs - lastTimeMs;
123✔
543
  double  rate = (double)deltaCount / (double)deltaMs;
123✔
544
  return rate;
123✔
545
}
546

547
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
221,400✔
548
  bool stateChanged = false;
221,400✔
549
  bool roleChanged = pGid->syncState != pVload->syncState ||
660,414✔
550
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
428,481!
551
                     pGid->roleTimeMs != pVload->roleTimeMs;
207,081✔
552

553
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
221,400✔
554
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
249✔
555
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
101✔
556
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
148!
557
      int64_t currentTimeMs = taosGetTimestampMs();
148✔
558
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
148✔
559
                                          pGid->lastSyncAppliedIndexUpdateTime);
560

561
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
148✔
562
    }
563
  }
564

565
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
221,400✔
566
  pGid->syncCommitIndex = pVload->syncCommitIndex;
221,400✔
567
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
221,400✔
568
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
221,400✔
569
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
221,400!
570
      pGid->startTimeMs != pVload->startTimeMs) {
206,014!
571
    mInfo(
15,386!
572
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
573
        "canRead:%d, dnode:%d",
574
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
575
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
576
    pGid->syncState = pVload->syncState;
15,386✔
577
    pGid->syncTerm = pVload->syncTerm;
15,386✔
578
    pGid->syncRestore = pVload->syncRestore;
15,386✔
579
    pGid->syncCanRead = pVload->syncCanRead;
15,386✔
580
    pGid->startTimeMs = pVload->startTimeMs;
15,386✔
581
    pGid->roleTimeMs = pVload->roleTimeMs;
15,386✔
582
    stateChanged = true;
15,386✔
583
  }
584
  return stateChanged;
221,400✔
585
}
586

587
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
57,465✔
588
  bool stateChanged = false;
57,465✔
589
  bool roleChanged = pObj->syncState != pMload->syncState ||
170,202✔
590
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
112,686!
591
                     pObj->roleTimeMs != pMload->roleTimeMs;
55,221✔
592
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
57,465✔
593
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
2,319!
594
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
595
          pObj->syncTerm, pMload->syncTerm);
596
    pObj->syncState = pMload->syncState;
2,319✔
597
    pObj->syncTerm = pMload->syncTerm;
2,319✔
598
    pObj->syncRestore = pMload->syncRestore;
2,319✔
599
    pObj->roleTimeMs = pMload->roleTimeMs;
2,319✔
600
    stateChanged = true;
2,319✔
601
  }
602
  return stateChanged;
57,465✔
603
}
604

605
extern char   *tsMonFwUri;
606
extern char   *tsMonSlowLogUri;
607
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
269✔
608
  SMnode    *pMnode = pReq->info.node;
269✔
609
  SStatisReq statisReq = {0};
269✔
610
  int32_t    code = -1;
269✔
611

612
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
269!
613

614
  if (tsMonitorLogProtocol) {
269!
615
    mInfo("process statis req,\n %s", statisReq.pCont);
×
616
  }
617

618
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
269✔
619
    monSendContent(statisReq.pCont, tsMonFwUri);
248✔
620
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
21!
621
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
21✔
622
  }
623

624
  tFreeSStatisReq(&statisReq);
269✔
625
  return 0;
269✔
626
}
627

628
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
7,080✔
629
  mTrace("process audit req:%p", pReq);
7,080✔
630
  if (tsEnableAudit && tsEnableAuditDelete) {
7,080!
631
    SMnode   *pMnode = pReq->info.node;
7,080✔
632
    SAuditReq auditReq = {0};
7,080✔
633

634
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
7,080!
635

636
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
7,080✔
637

638
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
7,080✔
639
                   auditReq.sqlLen);
640

641
    tFreeSAuditReq(&auditReq);
7,080✔
642
  }
643
  return 0;
7,080✔
644
}
645

646
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
4,528✔
647
  int32_t       code = 0, lino = 0;
4,528✔
648
  SDnodeInfoReq infoReq = {0};
4,528✔
649
  int32_t       contLen = 0;
4,528✔
650
  void         *pReq = NULL;
4,528✔
651

652
  infoReq.dnodeId = pDnode->id;
4,528✔
653
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
4,528✔
654

655
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
4,528!
656
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
657
  }
658
  pReq = rpcMallocCont(contLen);
4,528✔
659
  if (pReq == NULL) {
4,528!
660
    TAOS_RETURN(terrno);
×
661
  }
662

663
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
4,528!
664
    code = contLen;
×
665
    goto _exit;
×
666
  }
667

668
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
4,528✔
669
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
4,528!
670
_exit:
4,528✔
671
  if (code < 0) {
4,528!
672
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
673
  }
674
  TAOS_RETURN(code);
4,528✔
675
}
676

677
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
4,526✔
678
  int32_t       code = 0, lino = 0;
4,526✔
679
  SMnode       *pMnode = pReq->info.node;
4,526✔
680
  SDnodeInfoReq infoReq = {0};
4,526✔
681
  SDnodeObj    *pDnode = NULL;
4,526✔
682
  STrans       *pTrans = NULL;
4,526✔
683
  SSdbRaw      *pCommitRaw = NULL;
4,526✔
684

685
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
4,526!
686

687
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
4,526✔
688
  if (pDnode == NULL) {
4,526!
689
    TAOS_CHECK_EXIT(terrno);
×
690
  }
691

692
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
4,526✔
693
  if (pTrans == NULL) {
4,526!
694
    TAOS_CHECK_EXIT(terrno);
×
695
  }
696

697
  pDnode->updateTime = taosGetTimestampMs();
4,526✔
698

699
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
4,526!
700
    TAOS_CHECK_EXIT(terrno);
×
701
  }
702
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
4,526!
703
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
704
    TAOS_CHECK_EXIT(code);
×
705
  }
706
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
4,526!
707
  pCommitRaw = NULL;
4,526✔
708

709
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
4,526!
710
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
711
    TAOS_CHECK_EXIT(code);
×
712
  }
713

714
_exit:
4,526✔
715
  mndReleaseDnode(pMnode, pDnode);
4,526✔
716
  if (code != 0) {
4,526!
717
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
718
  }
719
  mndTransDrop(pTrans);
4,526✔
720
  sdbFreeRaw(pCommitRaw);
4,526✔
721
  TAOS_RETURN(code);
4,526✔
722
}
723

724
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
89,491✔
725
  SMnode    *pMnode = pReq->info.node;
89,491✔
726
  SStatusReq statusReq = {0};
89,491✔
727
  SDnodeObj *pDnode = NULL;
89,491✔
728
  int32_t    code = -1;
89,491✔
729

730
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
89,491!
731

732
  int64_t clusterid = mndGetClusterId(pMnode);
89,491✔
733
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
89,491!
734
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
735
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
736
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
737
    goto _OVER;
×
738
  }
739

740
  if (statusReq.dnodeId == 0) {
89,491✔
741
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
3,608✔
742
    if (pDnode == NULL) {
3,608✔
743
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
1,035!
744
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
1,035✔
745
      if (terrno != 0) code = terrno;
1,035!
746
      goto _OVER;
1,035✔
747
    }
748
  } else {
749
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
85,883✔
750
    if (pDnode == NULL) {
85,883✔
751
      int32_t err = terrno;
345✔
752
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
345✔
753
      if (pDnode != NULL) {
345✔
754
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
6✔
755
        terrno = err;
6✔
756
        goto _OVER;
6✔
757
      }
758

759
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
339!
760
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
339✔
761
        terrno = err;
91✔
762
        goto _OVER;
91✔
763
      } else {
764
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
248✔
765
        if (pDnode == NULL) goto _OVER;
248!
766
      }
767
    }
768
  }
769

770
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
88,359✔
771

772
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
88,359✔
773
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
88,359✔
774
  int64_t curMs = taosGetTimestampMs();
88,359✔
775
  bool    online = mndIsDnodeOnline(pDnode, curMs);
88,359✔
776
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
88,359✔
777
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
88,359✔
778
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
88,359✔
779
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
88,359✔
780
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
88,359✔
781
  bool    analVerChanged = (analVer != statusReq.analVer);
88,359✔
782
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
85,499!
783
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
173,858!
784
  const STraceId *trace = &pReq->info.traceId;
88,359✔
785
  char            timestamp[TD_TIME_STR_LEN] = {0};
88,359✔
786
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
88,359✔
787
  mGTrace(
88,359!
788
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
789
      "timestamp:%s",
790
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
791

792
  if (reboot) {
88,359✔
793
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
2,965✔
794
  }
795

796
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
313,100✔
797
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
224,741✔
798

799
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
224,741✔
800
    if (pVgroup != NULL) {
224,741✔
801
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
221,478!
802
        pVgroup->cacheUsage = pVload->cacheUsage;
172,533✔
803
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
172,533✔
804
        pVgroup->numOfTables = pVload->numOfTables;
172,533✔
805
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
172,533✔
806
        pVgroup->totalStorage = pVload->totalStorage;
172,533✔
807
        pVgroup->compStorage = pVload->compStorage;
172,533✔
808
        pVgroup->pointsWritten = pVload->pointsWritten;
172,533✔
809
      }
810
      bool stateChanged = false;
221,478✔
811
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
288,877✔
812
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
288,799✔
813
        if (pGid->dnodeId == statusReq.dnodeId) {
288,799✔
814
          if (pVload->startTimeMs == 0) {
221,400!
815
            pVload->startTimeMs = statusReq.rebootTime;
×
816
          }
817
          if (pVload->roleTimeMs == 0) {
221,400!
818
            pVload->roleTimeMs = statusReq.rebootTime;
×
819
          }
820
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
221,400✔
821
          break;
221,400✔
822
        }
823
      }
824
      if (stateChanged) {
221,478✔
825
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
15,386✔
826
        if (pDb != NULL && pDb->stateTs != curMs) {
15,386✔
827
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
9,370!
828
                pDb->stateTs, curMs);
829
          pDb->stateTs = curMs;
9,370✔
830
        }
831
        mndReleaseDb(pMnode, pDb);
15,386✔
832
      }
833
    }
834

835
    mndReleaseVgroup(pMnode, pVgroup);
224,741✔
836
  }
837

838
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
88,359✔
839
  if (pObj != NULL) {
88,359✔
840
    if (statusReq.mload.roleTimeMs == 0) {
57,465✔
841
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
1,972✔
842
    }
843
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
57,465✔
844
    mndReleaseMnode(pMnode, pObj);
57,465✔
845
  }
846

847
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
88,359✔
848
  if (pQnode != NULL) {
88,359✔
849
    pQnode->load = statusReq.qload;
14,449✔
850
    mndReleaseQnode(pMnode, pQnode);
14,449✔
851
  }
852

853
  if (needCheck) {
88,359✔
854
    if (statusReq.sver != tsVersion) {
10,459!
855
      if (pDnode != NULL) {
×
856
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
857
      }
858
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
859
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
860
      goto _OVER;
×
861
    }
862

863
    if (statusReq.dnodeId == 0) {
10,459✔
864
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
2,573!
865
    } else {
866
      if (statusReq.clusterId != pMnode->clusterId) {
7,886!
867
        if (pDnode != NULL) {
×
868
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
869
        }
870
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
871
               pMnode->clusterId);
872
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
873
        goto _OVER;
×
874
      }
875
    }
876

877
    // Verify whether the cluster parameters are consistent when status change from offline to ready
878
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
10,459✔
879
    if (pDnode->offlineReason != 0) {
10,459!
880
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
881
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
882
      goto _OVER;
×
883
    }
884

885
    if (!online) {
10,459✔
886
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
2,860!
887
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
888
    } else {
889
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
7,599!
890
            statusReq.dnodeVer, dnodeVer, reboot);
891
    }
892

893
    pDnode->rebootTime = statusReq.rebootTime;
10,459✔
894
    pDnode->numOfCores = statusReq.numOfCores;
10,459✔
895
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
10,459✔
896
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
10,459✔
897
    pDnode->memAvail = statusReq.memAvail;
10,459✔
898
    pDnode->memTotal = statusReq.memTotal;
10,459✔
899
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
10,459✔
900
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
10,459✔
901
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
10,459✔
902
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
4,528✔
903
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
4,528!
904
        goto _OVER;
×
905
      }
906
    }
907

908
    SStatusRsp statusRsp = {0};
10,459✔
909
    statusRsp.statusSeq++;
10,459✔
910
    statusRsp.analVer = analVer;
10,459✔
911
    statusRsp.dnodeVer = dnodeVer;
10,459✔
912
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
10,459✔
913
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
10,459✔
914
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
10,459✔
915
    if (statusRsp.pDnodeEps == NULL) {
10,459!
916
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
917
      goto _OVER;
×
918
    }
919

920
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
10,459✔
921
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
10,459✔
922

923
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
10,459✔
924
    void   *pHead = rpcMallocCont(contLen);
10,459✔
925
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
10,459✔
926
    taosArrayDestroy(statusRsp.pDnodeEps);
10,459✔
927
    if (contLen < 0) {
10,459!
928
      code = contLen;
×
929
      goto _OVER;
×
930
    }
931

932
    pReq->info.rspLen = contLen;
10,459✔
933
    pReq->info.rsp = pHead;
10,459✔
934
  }
935

936
  pDnode->accessTimes++;
88,359✔
937
  pDnode->lastAccessTime = curMs;
88,359✔
938
  code = 0;
88,359✔
939

940
_OVER:
89,491✔
941
  mndReleaseDnode(pMnode, pDnode);
89,491✔
942
  taosArrayDestroy(statusReq.pVloads);
89,491✔
943
  return mndUpdClusterInfo(pReq);
89,491✔
944
}
945

946
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
947
  SMnode    *pMnode = pReq->info.node;
×
948
  SNotifyReq notifyReq = {0};
×
949
  int32_t    code = 0;
×
950

951
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
952
    terrno = code;
×
953
    goto _OVER;
×
954
  }
955

956
  int64_t clusterid = mndGetClusterId(pMnode);
×
957
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
958
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
959
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
960
          notifyReq.clusterId, clusterid, tstrerror(code));
961
    goto _OVER;
×
962
  }
963

964
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
965
  for (int32_t v = 0; v < nVgroup; ++v) {
×
966
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
967

968
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
969
    if (pVgroup != NULL) {
×
970
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
971
      mndReleaseVgroup(pMnode, pVgroup);
×
972
    }
973
  }
974
  code = mndUpdClusterInfo(pReq);
×
975
_OVER:
×
976
  tFreeSNotifyReq(&notifyReq);
×
977
  return code;
×
978
}
979

980
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
474✔
981
  int32_t  code = -1;
474✔
982
  SSdbRaw *pRaw = NULL;
474✔
983
  STrans  *pTrans = NULL;
474✔
984

985
  SDnodeObj dnodeObj = {0};
474✔
986
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
474✔
987
  dnodeObj.createdTime = taosGetTimestampMs();
474✔
988
  dnodeObj.updateTime = dnodeObj.createdTime;
474✔
989
  dnodeObj.port = pCreate->port;
474✔
990
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
474✔
991
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
474✔
992

993
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
474✔
994
  if (pTrans == NULL) {
474!
995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
996
    if (terrno != 0) code = terrno;
×
997
    goto _OVER;
×
998
  }
999
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
474!
1000
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
474!
1001

1002
  pRaw = mndDnodeActionEncode(&dnodeObj);
474✔
1003
  if (pRaw == NULL) {
474!
1004
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1005
    if (terrno != 0) code = terrno;
×
1006
    goto _OVER;
×
1007
  }
1008
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
474!
1009
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
474!
1010
  pRaw = NULL;
474✔
1011

1012
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
474!
1013
  code = 0;
474✔
1014

1015
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
474✔
1016
                                   1);  // TODO: check the return value
1017
_OVER:
474✔
1018
  mndTransDrop(pTrans);
474✔
1019
  sdbFreeRaw(pRaw);
474✔
1020
  return code;
474✔
1021
}
1022

1023
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
212✔
1024
  SMnode       *pMnode = pReq->info.node;
212✔
1025
  SSdb         *pSdb = pMnode->pSdb;
212✔
1026
  SDnodeObj    *pObj = NULL;
212✔
1027
  void         *pIter = NULL;
212✔
1028
  SDnodeListRsp rsp = {0};
212✔
1029
  int32_t       code = -1;
212✔
1030

1031
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
212✔
1032
  if (NULL == rsp.dnodeList) {
212!
1033
    mError("failed to alloc epSet while process dnode list req");
×
1034
    code = terrno;
×
1035
    goto _OVER;
×
1036
  }
1037

1038
  while (1) {
548✔
1039
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
760✔
1040
    if (pIter == NULL) break;
760✔
1041

1042
    SDNodeAddr dnodeAddr = {0};
548✔
1043
    dnodeAddr.nodeId = pObj->id;
548✔
1044
    dnodeAddr.epSet.numOfEps = 1;
548✔
1045
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
548✔
1046
    dnodeAddr.epSet.eps[0].port = pObj->port;
548✔
1047

1048
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
1,096!
1049
      if (terrno != 0) code = terrno;
×
1050
      sdbRelease(pSdb, pObj);
×
1051
      sdbCancelFetch(pSdb, pIter);
×
1052
      goto _OVER;
×
1053
    }
1054

1055
    sdbRelease(pSdb, pObj);
548✔
1056
  }
1057

1058
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
212✔
1059
  void   *pRsp = rpcMallocCont(rspLen);
212✔
1060
  if (pRsp == NULL) {
212!
1061
    code = terrno;
×
1062
    goto _OVER;
×
1063
  }
1064

1065
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
212!
1066
    code = rspLen;
×
1067
    goto _OVER;
×
1068
  }
1069

1070
  pReq->info.rspLen = rspLen;
212✔
1071
  pReq->info.rsp = pRsp;
212✔
1072
  code = 0;
212✔
1073

1074
_OVER:
212✔
1075

1076
  if (code != 0) {
212!
1077
    mError("failed to get dnode list since %s", tstrerror(code));
×
1078
  }
1079

1080
  tFreeSDnodeListRsp(&rsp);
212✔
1081

1082
  TAOS_RETURN(code);
212✔
1083
}
1084

1085
void getSlowLogScopeString(int32_t scope, char *result) {
7✔
1086
  if (scope == SLOW_LOG_TYPE_NULL) {
7!
1087
    (void)strncat(result, "NONE", 64);
×
1088
    return;
×
1089
  }
1090
  while (scope > 0) {
14✔
1091
    if (scope & SLOW_LOG_TYPE_QUERY) {
7!
1092
      (void)strncat(result, "QUERY", 64);
7✔
1093
      scope &= ~SLOW_LOG_TYPE_QUERY;
7✔
1094
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1095
      (void)strncat(result, "INSERT", 64);
×
1096
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1097
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1098
      (void)strncat(result, "OTHERS", 64);
×
1099
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1100
    } else {
1101
      (void)printf("invalid slow log scope:%d", scope);
×
1102
      return;
×
1103
    }
1104

1105
    if (scope > 0) {
7!
1106
      (void)strncat(result, "|", 64);
×
1107
    }
1108
  }
1109
}
1110

1111
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
475✔
1112
  SMnode         *pMnode = pReq->info.node;
475✔
1113
  int32_t         code = -1;
475✔
1114
  SDnodeObj      *pDnode = NULL;
475✔
1115
  SCreateDnodeReq createReq = {0};
475✔
1116
  int32_t         lino = 0;
475✔
1117

1118
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
475!
1119
    goto _OVER;
×
1120
  }
1121

1122
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
475✔
1123
  TAOS_CHECK_GOTO(code, &lino, _OVER);
475!
1124

1125
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
475!
1126
  code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE);
475✔
1127
  TAOS_CHECK_GOTO(code, &lino, _OVER);
475✔
1128

1129
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
474!
1130
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1131
    goto _OVER;
×
1132
  }
1133
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1134
  // if (code != 0) {
1135
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1136
  //          tstrerror(code));
1137
  //   goto _OVER;
1138
  // }
1139

1140
  char ep[TSDB_EP_LEN];
1141
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
474✔
1142
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
474✔
1143
  if (pDnode != NULL) {
474!
1144
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1145
    goto _OVER;
×
1146
  }
1147

1148
  code = mndCreateDnode(pMnode, pReq, &createReq);
474✔
1149
  if (code == 0) {
474!
1150
    code = TSDB_CODE_ACTION_IN_PROGRESS;
474✔
1151
    tsGrantHBInterval = 5;
474✔
1152
  }
1153

1154
  char obj[200] = {0};
474✔
1155
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
474✔
1156

1157
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
474✔
1158

1159
_OVER:
475✔
1160
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
475!
1161
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
1!
1162
  }
1163

1164
  mndReleaseDnode(pMnode, pDnode);
475✔
1165
  tFreeSCreateDnodeReq(&createReq);
475✔
1166
  TAOS_RETURN(code);
475✔
1167
}
1168

1169
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1170

1171
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
10✔
1172

1173
#ifndef TD_ENTERPRISE
1174
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1175
#endif
1176

1177
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
24✔
1178
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1179
  int32_t  code = -1;
24✔
1180
  SSdbRaw *pRaw = NULL;
24✔
1181
  STrans  *pTrans = NULL;
24✔
1182
  int32_t  lino = 0;
24✔
1183

1184
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
24✔
1185
  if (pTrans == NULL) {
24!
1186
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1187
    if (terrno != 0) code = terrno;
×
1188
    goto _OVER;
×
1189
  }
1190
  mndTransSetGroupParallel(pTrans);
24✔
1191
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
24!
1192
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
24!
1193

1194
  pRaw = mndDnodeActionEncode(pDnode);
24✔
1195
  if (pRaw == NULL) {
24!
1196
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1197
    if (terrno != 0) code = terrno;
×
1198
    goto _OVER;
×
1199
  }
1200
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
24!
1201
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
24!
1202
  pRaw = NULL;
24✔
1203

1204
  pRaw = mndDnodeActionEncode(pDnode);
24✔
1205
  if (pRaw == NULL) {
24!
1206
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1207
    if (terrno != 0) code = terrno;
×
1208
    goto _OVER;
×
1209
  }
1210
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
24!
1211
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
24!
1212
  pRaw = NULL;
24✔
1213

1214
  if (pMObj != NULL) {
24✔
1215
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
4!
1216
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
4!
1217
  }
1218

1219
  if (pQObj != NULL) {
24✔
1220
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1221
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
3!
1222
  }
1223

1224
  if (pSObj != NULL) {
24✔
1225
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1226
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), &lino, _OVER);
3!
1227
  }
1228

1229
  if (pBObj != NULL) {
24!
1230
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
1231
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), NULL, _OVER);
×
1232
  }
1233

1234
  if (numOfVnodes > 0) {
24✔
1235
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
14!
1236
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
14✔
1237
  }
1238

1239
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
23!
1240

1241
  code = 0;
23✔
1242

1243
_OVER:
24✔
1244
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
24!
1245
  mndTransDrop(pTrans);
24✔
1246
  sdbFreeRaw(pRaw);
24✔
1247
  TAOS_RETURN(code);
24✔
1248
}
1249

1250
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1251
  bool       isEmpty = false;
×
1252
  SMnodeObj *pMObj = NULL;
×
1253
  SQnodeObj *pQObj = NULL;
×
1254
  SSnodeObj *pSObj = NULL;
×
1255

1256
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1257
  if (pQObj) goto _OVER;
×
1258

1259
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1260
  if (pSObj) goto _OVER;
×
1261

1262
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1263
  if (pMObj) goto _OVER;
×
1264

1265
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1266
  if (numOfVnodes > 0) goto _OVER;
×
1267

1268
  isEmpty = true;
×
1269
_OVER:
×
1270
  mndReleaseMnode(pMnode, pMObj);
×
1271
  mndReleaseQnode(pMnode, pQObj);
×
1272
  mndReleaseSnode(pMnode, pSObj);
×
1273
  return isEmpty;
×
1274
}
1275

1276
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
29✔
1277
  SMnode       *pMnode = pReq->info.node;
29✔
1278
  int32_t       code = -1;
29✔
1279
  SDnodeObj    *pDnode = NULL;
29✔
1280
  SMnodeObj    *pMObj = NULL;
29✔
1281
  SQnodeObj    *pQObj = NULL;
29✔
1282
  SSnodeObj    *pSObj = NULL;
29✔
1283
  SBnodeObj    *pBObj = NULL;
29✔
1284
  SDropDnodeReq dropReq = {0};
29✔
1285

1286
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
29!
1287

1288
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
29!
1289
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1290
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
29✔
1291

1292
  bool force = dropReq.force;
28✔
1293
  if (dropReq.unsafe) {
28✔
1294
    force = true;
1✔
1295
  }
1296

1297
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
28✔
1298
  if (pDnode == NULL) {
28!
1299
    int32_t err = terrno;
×
1300
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1301
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1302
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1303
    if (pDnode == NULL) {
×
1304
      code = err;
×
1305
      goto _OVER;
×
1306
    }
1307
  }
1308

1309
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
28✔
1310
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
28✔
1311
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
28✔
1312
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
28✔
1313
  if (pMObj != NULL) {
28✔
1314
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
8✔
1315
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
1✔
1316
      goto _OVER;
1✔
1317
    }
1318
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
7✔
1319
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
2✔
1320
      goto _OVER;
2✔
1321
    }
1322
  }
1323

1324
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
25✔
1325
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
25✔
1326

1327
  if (isonline && force) {
25!
1328
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1329
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1330
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1331
    goto _OVER;
×
1332
  }
1333

1334
  bool    vnodeOffline = false;
25✔
1335
  void   *pIter = NULL;
25✔
1336
  int32_t vgId = -1;
25✔
1337
  while (1) {
37✔
1338
    SVgObj *pVgroup = NULL;
62✔
1339
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
62✔
1340
    if (pIter == NULL) break;
62✔
1341

1342
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
118✔
1343
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
81✔
1344
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
30✔
1345
          vgId = pVgroup->vgId;
3✔
1346
          vnodeOffline = true;
3✔
1347
          break;
3✔
1348
        }
1349
      }
1350
    }
1351

1352
    sdbRelease(pMnode->pSdb, pVgroup);
40✔
1353

1354
    if (vnodeOffline) {
40✔
1355
      sdbCancelFetch(pMnode->pSdb, pIter);
3✔
1356
      break;
3✔
1357
    }
1358
  }
1359

1360
  if (vnodeOffline && !force) {
25✔
1361
    code = TSDB_CODE_VND_VNODE_OFFLINE;
1✔
1362
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
1!
1363
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1364
    goto _OVER;
1✔
1365
  }
1366

1367
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
24✔
1368
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
24✔
1369

1370
  char obj1[30] = {0};
24✔
1371
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
24✔
1372

1373
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
24✔
1374

1375
_OVER:
29✔
1376
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
29!
1377
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
6!
1378
  }
1379

1380
  mndReleaseDnode(pMnode, pDnode);
29✔
1381
  mndReleaseMnode(pMnode, pMObj);
29✔
1382
  mndReleaseQnode(pMnode, pQObj);
29✔
1383
  mndReleaseSnode(pMnode, pSObj);
29✔
1384
  tFreeSDropDnodeReq(&dropReq);
29✔
1385
  TAOS_RETURN(code);
29✔
1386
}
1387

1388
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1✔
1389
  int32_t code = 0;
1✔
1390
  SMnode *pMnode = pReq->info.node;
1✔
1391
  SSdb   *pSdb = pMnode->pSdb;
1✔
1392
  void   *pIter = NULL;
1✔
1393
  int8_t  encrypting = 0;
1✔
1394

1395
  const STraceId *trace = &pReq->info.traceId;
1✔
1396

1397
  int32_t klen = strlen(pDcfgReq->value);
1✔
1398
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1!
1399
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1400
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1401
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1402
    goto _exit;
×
1403
  }
1404

1405
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1!
1406
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1407
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1408
    goto _exit;
×
1409
  }
1410

1411
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1!
1412
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1413
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1414
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1415
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1416
    goto _exit;
×
1417
  }
1418

1419
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1✔
1420
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1✔
1421
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1✔
1422

1423
  while (1) {
1✔
1424
    SDnodeObj *pDnode = NULL;
2✔
1425
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
2✔
1426
    if (pIter == NULL) break;
2✔
1427
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
1!
1428
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1429
             offlineReason[pDnode->offlineReason]);
1430
      sdbRelease(pSdb, pDnode);
×
1431
      continue;
×
1432
    }
1433

1434
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
1!
1435
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
1✔
1436
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
1✔
1437
      void   *pBuf = rpcMallocCont(bufLen);
1✔
1438

1439
      if (pBuf != NULL) {
1!
1440
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
1!
1441
          code = bufLen;
×
1442
          sdbRelease(pSdb, pDnode);
×
1443
          goto _exit;
×
1444
        }
1445
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
1✔
1446
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
1!
1447
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
1✔
1448
        }
1449
      }
1450
    }
1451

1452
    sdbRelease(pSdb, pDnode);
1✔
1453
  }
1454

1455
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1!
1456
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1457
  }
1458

1459
_exit:
1✔
1460
  if (code != 0) {
1!
1461
    if (terrno == 0) terrno = code;
×
1462
  }
1463
  return code;
1✔
1464
}
1465

1466
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1✔
1467
  int32_t code = 0;
1✔
1468

1469
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1470
  SMnode       *pMnode = pReq->info.node;
1✔
1471
  SMCfgDnodeReq cfgReq = {0};
1✔
1472
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1!
1473

1474
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1!
1475
    tFreeSMCfgDnodeReq(&cfgReq);
×
1476
    TAOS_RETURN(code);
×
1477
  }
1478
  const STraceId *trace = &pReq->info.traceId;
1✔
1479
  SDCfgDnodeReq   dcfgReq = {0};
1✔
1480
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1!
1481
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
1✔
1482
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
1✔
1483
    tFreeSMCfgDnodeReq(&cfgReq);
1✔
1484
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1✔
1485
  } else {
1486
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1487
    tFreeSMCfgDnodeReq(&cfgReq);
×
1488
    TAOS_RETURN(code);
×
1489
  }
1490

1491
#else
1492
  TAOS_RETURN(code);
1493
#endif
1494
}
1495

1496
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
1✔
1497
  SMnode *pMnode = pRsp->info.node;
1✔
1498
  int16_t nSuccess = 0;
1✔
1499
  int16_t nFailed = 0;
1✔
1500

1501
  if (0 == pRsp->code) {
1!
1502
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
1✔
1503
  } else {
1504
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1505
  }
1506

1507
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
1✔
1508
  bool    finished = nSuccess + nFailed >= nReq;
1✔
1509

1510
  if (finished) {
1!
1511
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1✔
1512
  }
1513

1514
  const STraceId *trace = &pRsp->info.traceId;
1✔
1515
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
1!
1516
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1517

1518
  return 0;
1✔
1519
}
1520

1521
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7✔
1522
  SMnode *pMnode = pReq->info.node;
7✔
1523
  int32_t totalRows = 0;
7✔
1524
  int32_t numOfRows = 0;
7✔
1525
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
7✔
1526
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
7✔
1527
  char   *pWrite = NULL;
7✔
1528
  int32_t cols = 0;
7✔
1529
  int32_t code = 0;
7✔
1530
  int32_t lino = 0;
7✔
1531

1532
  cfgOpts[totalRows] = "statusInterval";
7✔
1533
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
7✔
1534
  totalRows++;
7✔
1535

1536
  cfgOpts[totalRows] = "timezone";
7✔
1537
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
7✔
1538
  totalRows++;
7✔
1539

1540
  cfgOpts[totalRows] = "locale";
7✔
1541
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
7✔
1542
  totalRows++;
7✔
1543

1544
  cfgOpts[totalRows] = "charset";
7✔
1545
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
7✔
1546
  totalRows++;
7✔
1547

1548
  cfgOpts[totalRows] = "monitor";
7✔
1549
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
7✔
1550
  totalRows++;
7✔
1551

1552
  cfgOpts[totalRows] = "monitorInterval";
7✔
1553
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
7✔
1554
  totalRows++;
7✔
1555

1556
  cfgOpts[totalRows] = "slowLogThreshold";
7✔
1557
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
7✔
1558
  totalRows++;
7✔
1559

1560
  cfgOpts[totalRows] = "slowLogMaxLen";
7✔
1561
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
7✔
1562
  totalRows++;
7✔
1563

1564
  char scopeStr[64] = {0};
7✔
1565
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
7✔
1566
  cfgOpts[totalRows] = "slowLogScope";
7✔
1567
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
7✔
1568
  totalRows++;
7✔
1569

1570
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1571
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1572

1573
  for (int32_t i = 0; i < totalRows; i++) {
70✔
1574
    cols = 0;
63✔
1575

1576
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
63✔
1577
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1578
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
63!
1579

1580
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
63✔
1581
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1582
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
63!
1583

1584
    numOfRows++;
63✔
1585
  }
1586

1587
_OVER:
7✔
1588
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
7!
1589
  pShow->numOfRows += numOfRows;
7✔
1590
  return numOfRows;
7✔
1591
}
1592

1593
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1594

1595
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,040✔
1596
  SMnode    *pMnode = pReq->info.node;
1,040✔
1597
  SSdb      *pSdb = pMnode->pSdb;
1,040✔
1598
  int32_t    numOfRows = 0;
1,040✔
1599
  int32_t    cols = 0;
1,040✔
1600
  ESdbStatus objStatus = 0;
1,040✔
1601
  SDnodeObj *pDnode = NULL;
1,040✔
1602
  int64_t    curMs = taosGetTimestampMs();
1,040✔
1603
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1604
  int32_t    code = 0;
1,040✔
1605
  int32_t    lino = 0;
1,040✔
1606

1607
  while (numOfRows < rows) {
4,857!
1608
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
4,857✔
1609
    if (pShow->pIter == NULL) break;
4,857✔
1610
    bool online = mndIsDnodeOnline(pDnode, curMs);
3,817✔
1611

1612
    cols = 0;
3,817✔
1613

1614
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,817✔
1615
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
3,817!
1616

1617
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
3,817✔
1618

1619
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,817✔
1620
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
3,817!
1621

1622
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,817✔
1623
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
3,817✔
1624
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
3,817!
1625

1626
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,817✔
1627
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
3,817!
1628
                        &lino, _OVER);
1629

1630
    const char *status = "ready";
3,817✔
1631
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
3,817!
1632
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
3,817!
1633
    if (!online) {
3,817✔
1634
      if (objStatus == SDB_STATUS_CREATING)
517!
1635
        status = "creating*";
×
1636
      else if (objStatus == SDB_STATUS_DROPPING)
517!
1637
        status = "dropping*";
×
1638
      else
1639
        status = "offline";
517✔
1640
    }
1641

1642
    STR_TO_VARSTR(buf, status);
3,817✔
1643
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,817✔
1644
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
3,817!
1645

1646
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,817✔
1647
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
3,817!
1648
                        _OVER);
1649

1650
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,817✔
1651
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
3,817!
1652
                        _OVER);
1653

1654
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
3,817!
1655
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
3,817✔
1656

1657
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,817✔
1658
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
3,817!
1659
    taosMemoryFreeClear(b);
3,817!
1660

1661
#ifdef TD_ENTERPRISE
1662
    STR_TO_VARSTR(buf, pDnode->machineId);
3,817✔
1663
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,817✔
1664
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
3,817!
1665
#endif
1666

1667
    numOfRows++;
3,817✔
1668
    sdbRelease(pSdb, pDnode);
3,817✔
1669
  }
1670

1671
_OVER:
×
1672
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
1,040!
1673

1674
  pShow->numOfRows += numOfRows;
1,040✔
1675
  return numOfRows;
1,040✔
1676
}
1677

1678
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1679
  SSdb *pSdb = pMnode->pSdb;
×
1680
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1681
}
×
1682

1683
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
2,900✔
1684
  SDnodeObj *pObj = NULL;
2,900✔
1685
  void      *pIter = NULL;
2,900✔
1686
  SSdb      *pSdb = pMnode->pSdb;
2,900✔
1687
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
2,900✔
1688
  while (1) {
2,602✔
1689
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
5,502✔
1690
    if (pIter == NULL) break;
5,502✔
1691

1692
    char *fqdn = taosStrdup(pObj->fqdn);
2,602!
1693
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
2,602!
1694
      mError("failed to fqdn into array, but continue at this time");
×
1695
    }
1696
    sdbRelease(pSdb, pObj);
2,602✔
1697
  }
1698
  return fqdns;
2,900✔
1699
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc