• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4876

10 Dec 2025 05:56AM UTC coverage: 64.632% (+0.2%) from 64.472%
#4876

push

travis-ci

guanshengliang
test: fix idmp case with checkDataMemLoop checked (#33862)

4 of 9 new or added lines in 3 files covered. (44.44%)

380 existing lines in 104 files now uncovered.

162866 of 251990 relevant lines covered (64.63%)

107950382.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.93
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "taos_monitor.h"
33
#include "tconfig.h"
34
#include "tjson.h"
35
#include "tmisce.h"
36
#include "tunit.h"
37

38
#define TSDB_DNODE_VER_NUMBER   2
39
#define TSDB_DNODE_RESERVE_SIZE 40
40

41
static const char *offlineReason[] = {
42
    "",
43
    "status msg timeout",
44
    "status not received",
45
    "version not match",
46
    "dnodeId not match",
47
    "clusterId not match",
48
    "statusInterval not match",
49
    "timezone not match",
50
    "locale not match",
51
    "charset not match",
52
    "ttlChangeOnWrite not match",
53
    "enableWhiteList not match",
54
    "encryptionKey not match",
55
    "monitor not match",
56
    "monitor switch not match",
57
    "monitor interval not match",
58
    "monitor slow log threshold not match",
59
    "monitor slow log sql max len not match",
60
    "monitor slow log scope not match",
61
    "unknown",
62
};
63

64
enum {
65
  DND_ACTIVE_CODE,
66
  DND_CONN_ACTIVE_CODE,
67
};
68

69
enum {
70
  DND_CREATE,
71
  DND_ADD,
72
  DND_DROP,
73
};
74

75
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
76
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
77
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
78
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
79
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
80
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
81
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
82

83
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
91
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
92
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
93

94
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
96
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
98

99
#ifdef _GRANT
100
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
101
#else
102
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
103
#endif
104

105
int32_t mndInitDnode(SMnode *pMnode) {
494,234✔
106
  SSdbTable table = {
494,234✔
107
      .sdbType = SDB_DNODE,
108
      .keyType = SDB_KEY_INT32,
109
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
110
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
111
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
112
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
113
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
114
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
115
  };
116

117
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
494,234✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
494,234✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
494,234✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
494,234✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
494,234✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
494,234✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
494,234✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
494,234✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
494,234✔
126
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
494,234✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
494,234✔
128

129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
494,234✔
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
494,234✔
131
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
494,234✔
132
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
494,234✔
133

134
  return sdbSetTable(pMnode->pSdb, table);
494,234✔
135
}
136

137
void mndCleanupDnode(SMnode *pMnode) {}
493,460✔
138

139
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
320,522✔
140
  int32_t  code = -1;
320,522✔
141
  SSdbRaw *pRaw = NULL;
320,522✔
142
  STrans  *pTrans = NULL;
320,522✔
143

144
  SDnodeObj dnodeObj = {0};
320,522✔
145
  dnodeObj.id = 1;
320,522✔
146
  dnodeObj.createdTime = taosGetTimestampMs();
320,522✔
147
  dnodeObj.updateTime = dnodeObj.createdTime;
320,522✔
148
  dnodeObj.port = tsServerPort;
320,522✔
149
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
320,522✔
150
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
320,522✔
151
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
320,522✔
152
  char *machineId = NULL;
320,522✔
153
  code = tGetMachineId(&machineId);
320,522✔
154
  if (machineId) {
320,522✔
155
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
320,522✔
156
    taosMemoryFreeClear(machineId);
320,522✔
157
  } else {
158
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
159
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
160
    goto _OVER;
×
161
#endif
162
  }
163

164
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
320,522✔
165
  if (pTrans == NULL) {
320,522✔
166
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
167
    if (terrno != 0) code = terrno;
×
168
    goto _OVER;
×
169
  }
170
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
320,522✔
171

172
  pRaw = mndDnodeActionEncode(&dnodeObj);
320,522✔
173
  if (pRaw == NULL) {
320,522✔
174
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
175
    if (terrno != 0) code = terrno;
×
176
    goto _OVER;
×
177
  }
178
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
320,522✔
179
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
320,522✔
180
  pRaw = NULL;
320,522✔
181

182
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
320,522✔
183
  code = 0;
320,522✔
184

185
_OVER:
320,522✔
186
  mndTransDrop(pTrans);
320,522✔
187
  sdbFreeRaw(pRaw);
320,522✔
188
  return code;
320,522✔
189
}
190

191
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,918,879✔
192
  int32_t code = 0;
2,918,879✔
193
  int32_t lino = 0;
2,918,879✔
194
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,918,879✔
195

196
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,918,879✔
197
  if (pRaw == NULL) goto _OVER;
2,918,879✔
198

199
  int32_t dataPos = 0;
2,918,879✔
200
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,918,879✔
201
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,918,879✔
202
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,918,879✔
203
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,918,879✔
204
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,918,879✔
205
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,918,879✔
206
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,918,879✔
207
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,918,879✔
208
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,918,879✔
209
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,918,879✔
210

211
  terrno = 0;
2,918,879✔
212

213
_OVER:
2,918,879✔
214
  if (terrno != 0) {
2,918,879✔
215
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
216
    sdbFreeRaw(pRaw);
×
217
    return NULL;
×
218
  }
219

220
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,918,879✔
221
  return pRaw;
2,918,879✔
222
}
223

224
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
2,212,532✔
225
  int32_t code = 0;
2,212,532✔
226
  int32_t lino = 0;
2,212,532✔
227
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,212,532✔
228
  SSdbRow   *pRow = NULL;
2,212,532✔
229
  SDnodeObj *pDnode = NULL;
2,212,532✔
230

231
  int8_t sver = 0;
2,212,532✔
232
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
2,212,532✔
233
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
2,212,532✔
234
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
235
    goto _OVER;
×
236
  }
237

238
  pRow = sdbAllocRow(sizeof(SDnodeObj));
2,212,532✔
239
  if (pRow == NULL) goto _OVER;
2,212,532✔
240

241
  pDnode = sdbGetRowObj(pRow);
2,212,532✔
242
  if (pDnode == NULL) goto _OVER;
2,212,532✔
243

244
  int32_t dataPos = 0;
2,212,532✔
245
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
2,212,532✔
246
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
2,212,532✔
247
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
2,212,532✔
248
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
2,212,532✔
249
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,212,532✔
250
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,212,532✔
251
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,212,532✔
252
  if (sver > 1) {
2,212,532✔
253
    int16_t keyLen = 0;
2,212,532✔
254
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,212,532✔
255
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,212,532✔
256
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,212,532✔
257
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,212,532✔
258
  }
259

260
  terrno = 0;
2,212,532✔
261
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
2,212,532✔
262
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
263
  }
264

265
_OVER:
2,212,532✔
266
  if (terrno != 0) {
2,212,532✔
267
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
268
    taosMemoryFreeClear(pRow);
×
269
    return NULL;
×
270
  }
271

272
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
2,212,532✔
273
  return pRow;
2,212,532✔
274
}
275

276
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
1,051,643✔
277
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
1,051,643✔
278
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
1,051,643✔
279

280
  char ep[TSDB_EP_LEN] = {0};
1,051,643✔
281
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
1,051,643✔
282
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
1,051,643✔
283
  return 0;
1,051,643✔
284
}
285

286
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
2,212,005✔
287
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
2,212,005✔
288
  return 0;
2,212,005✔
289
}
290

291
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
1,148,017✔
292
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
1,148,017✔
293
  pOld->updateTime = pNew->updateTime;
1,148,017✔
294
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
295
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
1,148,017✔
296
#endif
297
  return 0;
1,148,017✔
298
}
299

300
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
177,488,474✔
301
  SSdb      *pSdb = pMnode->pSdb;
177,488,474✔
302
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
177,488,474✔
303
  if (pDnode == NULL) {
177,487,744✔
304
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
338,309✔
305
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
99,827✔
306
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
238,482✔
307
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
308
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
238,482✔
309
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
238,482✔
310
    } else {
311
      terrno = TSDB_CODE_APP_ERROR;
×
312
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
313
    }
314
  }
315

316
  return pDnode;
177,487,744✔
317
}
318

319
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
179,210,167✔
320
  SSdb *pSdb = pMnode->pSdb;
179,210,167✔
321
  sdbRelease(pSdb, pDnode);
179,210,167✔
322
}
179,210,167✔
323

324
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
10,356,670✔
325
  SEpSet epSet = {0};
10,356,670✔
326
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
10,356,670✔
327
  return epSet;
10,356,670✔
328
}
329

330
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
1,231,675✔
331
  SEpSet     epSet = {0};
1,231,675✔
332
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,231,675✔
333
  if (!pDnode) return epSet;
1,231,675✔
334

335
  epSet = mndGetDnodeEpset(pDnode);
1,231,675✔
336

337
  mndReleaseDnode(pMnode, pDnode);
1,231,675✔
338
  return epSet;
1,231,675✔
339
}
340

341
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,798,956✔
342
  SSdb *pSdb = pMnode->pSdb;
1,798,956✔
343

344
  void *pIter = NULL;
1,798,956✔
345
  while (1) {
3,260,655✔
346
    SDnodeObj *pDnode = NULL;
5,059,611✔
347
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
5,059,611✔
348
    if (pIter == NULL) break;
5,059,611✔
349

350
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
3,979,034✔
351
      sdbCancelFetch(pSdb, pIter);
718,379✔
352
      return pDnode;
718,379✔
353
    }
354

355
    sdbRelease(pSdb, pDnode);
3,260,655✔
356
  }
357

358
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
1,080,577✔
359
  return NULL;
1,080,577✔
360
}
361

362
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
183,002✔
363
  SSdb *pSdb = pMnode->pSdb;
183,002✔
364

365
  void *pIter = NULL;
183,002✔
366
  while (1) {
199,571✔
367
    SDnodeObj *pDnode = NULL;
382,573✔
368
    ESdbStatus objStatus = 0;
382,573✔
369
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
382,573✔
370
    if (pIter == NULL) break;
382,573✔
371

372
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
382,573✔
373
      sdbCancelFetch(pSdb, pIter);
183,002✔
374
      return pDnode;
183,002✔
375
    }
376

377
    sdbRelease(pSdb, pDnode);
199,571✔
378
  }
379

380
  return NULL;
×
381
}
382

383
int32_t mndGetDnodeSize(SMnode *pMnode) {
94,343,383✔
384
  SSdb *pSdb = pMnode->pSdb;
94,343,383✔
385
  return sdbGetSize(pSdb, SDB_DNODE);
94,343,383✔
386
}
387

388
int32_t mndGetDbSize(SMnode *pMnode) {
×
389
  SSdb *pSdb = pMnode->pSdb;
×
390
  return sdbGetSize(pSdb, SDB_DB);
×
391
}
392

393
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
121,219,163✔
394
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
121,219,163✔
395
  if (interval > (int64_t)tsStatusTimeoutMs) {
121,219,047✔
396
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
3,139,469✔
397
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
66,885✔
398
    }
399
    return false;
3,139,469✔
400
  }
401
  return true;
118,079,578✔
402
}
403

404
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,758,262✔
405
  SSdb *pSdb = pMnode->pSdb;
2,758,262✔
406

407
  int32_t numOfEps = 0;
2,758,262✔
408
  void   *pIter = NULL;
2,758,262✔
409
  while (1) {
10,202,128✔
410
    SDnodeObj *pDnode = NULL;
12,960,390✔
411
    ESdbStatus objStatus = 0;
12,960,390✔
412
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
12,960,390✔
413
    if (pIter == NULL) break;
12,960,390✔
414

415
    SDnodeEp dnodeEp = {0};
10,202,128✔
416
    dnodeEp.id = pDnode->id;
10,202,128✔
417
    dnodeEp.ep.port = pDnode->port;
10,202,128✔
418
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
10,202,128✔
419
    sdbRelease(pSdb, pDnode);
10,202,128✔
420

421
    dnodeEp.isMnode = 0;
10,202,128✔
422
    if (mndIsMnode(pMnode, pDnode->id)) {
10,202,128✔
423
      dnodeEp.isMnode = 1;
4,106,454✔
424
    }
425
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
10,202,128✔
426
      mError("failed to put ep into array, but continue at this call");
×
427
    }
428
  }
429
}
2,758,262✔
430

431
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
35,590,605✔
432
  SSdb   *pSdb = pMnode->pSdb;
35,590,605✔
433
  int32_t code = 0;
35,590,605✔
434

435
  int32_t numOfEps = 0;
35,590,605✔
436
  void   *pIter = NULL;
35,590,605✔
437
  while (1) {
153,606,682✔
438
    SDnodeObj *pDnode = NULL;
189,197,287✔
439
    ESdbStatus objStatus = 0;
189,197,287✔
440
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
189,197,287✔
441
    if (pIter == NULL) break;
189,197,287✔
442

443
    SDnodeInfo dInfo;
153,602,073✔
444
    dInfo.id = pDnode->id;
153,606,682✔
445
    dInfo.ep.port = pDnode->port;
153,606,682✔
446
    dInfo.offlineReason = pDnode->offlineReason;
153,606,682✔
447
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
153,606,682✔
448
    sdbRelease(pSdb, pDnode);
153,606,682✔
449
    if (mndIsMnode(pMnode, pDnode->id)) {
153,606,682✔
450
      dInfo.isMnode = 1;
49,518,281✔
451
    } else {
452
      dInfo.isMnode = 0;
104,088,401✔
453
    }
454

455
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
153,606,682✔
456
      code = terrno;
×
457
      sdbCancelFetch(pSdb, pIter);
×
458
      break;
×
459
    }
460
  }
461
  TAOS_RETURN(code);
35,590,605✔
462
}
463

464
#define CHECK_MONITOR_PARA(para, err)                                                                    \
465
  if (pCfg->monitorParas.para != para) {                                                                 \
466
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
467
    terrno = err;                                                                                        \
468
    return err;                                                                                          \
469
  }
470

471
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
2,766,046✔
472
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
2,766,046✔
473
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
2,766,046✔
474
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
2,766,046✔
475
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
2,766,046✔
476
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
2,766,046✔
477

478
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
2,766,046✔
479
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
480
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
481
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
482
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
483
  }
484

485
  /*
486
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
487
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
488
           tsStatusIntervalMs);
489
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
490
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
491
  }
492
  */
493

494
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
2,766,046✔
495
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
7,784✔
496
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
497
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
7,784✔
498
    return DND_REASON_TIME_ZONE_NOT_MATCH;
7,784✔
499
  }
500

501
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
2,758,262✔
502
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
503
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
504
    return DND_REASON_LOCALE_NOT_MATCH;
×
505
  }
506

507
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
2,758,262✔
508
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
509
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
510
    return DND_REASON_CHARSET_NOT_MATCH;
×
511
  }
512

513
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
2,758,262✔
514
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
515
           tsTtlChangeOnWrite);
516
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
517
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
518
  }
519
  int8_t enable = tsEnableWhiteList ? 1 : 0;
2,758,262✔
520
  if (pCfg->enableWhiteList != enable) {
2,758,262✔
521
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
522
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
523
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
524
  }
525

526
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
2,758,262✔
527
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
2,758,262✔
528
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
529
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
530
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
531
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
532
  }
533

534
  return DND_REASON_ONLINE;
2,758,262✔
535
}
536

537
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
122,837✔
538
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
122,837✔
539
    return 0.0;
310✔
540
  }
541

542
  int64_t deltaCount = currentCount - lastCount;
122,527✔
543
  int64_t deltaMs = currentTimeMs - lastTimeMs;
122,527✔
544
  double  rate = (double)deltaCount / (double)deltaMs;
122,527✔
545
  return rate;
122,527✔
546
}
547

548
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
148,313,124✔
549
  bool stateChanged = false;
148,313,124✔
550
  bool roleChanged = pGid->syncState != pVload->syncState ||
148,404,860✔
551
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
291,229,301✔
552
                     pGid->roleTimeMs != pVload->roleTimeMs;
142,916,177✔
553

554
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
148,313,124✔
555
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
262,248✔
556
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
112,638✔
557
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
149,610✔
558
      int64_t currentTimeMs = taosGetTimestampMs();
122,837✔
559
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
122,837✔
560
                                          pGid->lastSyncAppliedIndexUpdateTime);
561

562
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
122,837✔
563
    }
564
  }
565

566
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
148,313,124✔
567
  pGid->syncCommitIndex = pVload->syncCommitIndex;
148,313,124✔
568
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
148,313,124✔
569
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
148,313,124✔
570
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
148,313,124✔
571
      pGid->startTimeMs != pVload->startTimeMs) {
142,523,124✔
572
    mInfo(
5,790,000✔
573
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
574
        "canRead:%d, dnode:%d",
575
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
576
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
577
    pGid->syncState = pVload->syncState;
5,790,000✔
578
    pGid->syncTerm = pVload->syncTerm;
5,790,000✔
579
    pGid->syncRestore = pVload->syncRestore;
5,790,000✔
580
    pGid->syncCanRead = pVload->syncCanRead;
5,790,000✔
581
    pGid->startTimeMs = pVload->startTimeMs;
5,790,000✔
582
    pGid->roleTimeMs = pVload->roleTimeMs;
5,790,000✔
583
    stateChanged = true;
5,790,000✔
584
  }
585
  return stateChanged;
148,313,124✔
586
}
587

588
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
33,408,614✔
589
  bool stateChanged = false;
33,408,614✔
590
  bool roleChanged = pObj->syncState != pMload->syncState ||
33,478,539✔
591
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
66,275,897✔
592
                     pObj->roleTimeMs != pMload->roleTimeMs;
32,867,283✔
593
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
33,408,614✔
594
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
556,101✔
595
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
596
          pObj->syncTerm, pMload->syncTerm);
597
    pObj->syncState = pMload->syncState;
556,101✔
598
    pObj->syncTerm = pMload->syncTerm;
556,101✔
599
    pObj->syncRestore = pMload->syncRestore;
556,101✔
600
    pObj->roleTimeMs = pMload->roleTimeMs;
556,101✔
601
    stateChanged = true;
556,101✔
602
  }
603
  return stateChanged;
33,408,614✔
604
}
605

606
extern char   *tsMonFwUri;
607
extern char   *tsMonSlowLogUri;
608
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
131✔
609
  SMnode    *pMnode = pReq->info.node;
131✔
610
  SStatisReq statisReq = {0};
131✔
611
  int32_t    code = -1;
131✔
612

613
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
131✔
614

615
  if (tsMonitorLogProtocol) {
131✔
616
    mInfo("process statis req,\n %s", statisReq.pCont);
131✔
617
  }
618

619
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
131✔
620
    monSendContent(statisReq.pCont, tsMonFwUri);
131✔
621
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
622
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
623
  }
624

625
  tFreeSStatisReq(&statisReq);
131✔
626
  return 0;
131✔
627
}
628

629
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
1,531,511✔
630
  mTrace("process audit req:%p", pReq);
1,531,511✔
631
  if (tsEnableAudit && tsEnableAuditDelete) {
1,531,511✔
632
    SMnode   *pMnode = pReq->info.node;
1,531,511✔
633
    SAuditReq auditReq = {0};
1,531,511✔
634

635
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
1,531,511✔
636

637
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
1,531,511✔
638

639
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
1,531,511✔
640
                   auditReq.sqlLen);
641

642
    tFreeSAuditReq(&auditReq);
1,531,511✔
643
  }
644
  return 0;
1,531,511✔
645
}
646

647
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
814,414✔
648
  int32_t       code = 0, lino = 0;
814,414✔
649
  SDnodeInfoReq infoReq = {0};
814,414✔
650
  int32_t       contLen = 0;
814,414✔
651
  void         *pReq = NULL;
814,414✔
652

653
  infoReq.dnodeId = pDnode->id;
814,414✔
654
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
814,414✔
655

656
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
814,414✔
657
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
658
  }
659
  pReq = rpcMallocCont(contLen);
814,414✔
660
  if (pReq == NULL) {
814,414✔
661
    TAOS_RETURN(terrno);
×
662
  }
663

664
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
814,414✔
665
    code = contLen;
×
666
    goto _exit;
×
667
  }
668

669
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
814,414✔
670
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
814,414✔
671
_exit:
814,414✔
672
  if (code < 0) {
814,414✔
673
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
674
  }
675
  TAOS_RETURN(code);
814,414✔
676
}
677

678
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
814,414✔
679
  int32_t       code = 0, lino = 0;
814,414✔
680
  SMnode       *pMnode = pReq->info.node;
814,414✔
681
  SDnodeInfoReq infoReq = {0};
814,414✔
682
  SDnodeObj    *pDnode = NULL;
814,414✔
683
  STrans       *pTrans = NULL;
814,414✔
684
  SSdbRaw      *pCommitRaw = NULL;
814,414✔
685

686
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
814,414✔
687

688
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
814,414✔
689
  if (pDnode == NULL) {
814,414✔
690
    TAOS_CHECK_EXIT(terrno);
×
691
  }
692

693
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
814,414✔
694
  if (pTrans == NULL) {
814,414✔
695
    TAOS_CHECK_EXIT(terrno);
×
696
  }
697

698
  pDnode->updateTime = taosGetTimestampMs();
814,414✔
699

700
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
814,414✔
701
    TAOS_CHECK_EXIT(terrno);
×
702
  }
703
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
814,414✔
704
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
705
    TAOS_CHECK_EXIT(code);
×
706
  }
707
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
814,414✔
708
  pCommitRaw = NULL;
814,414✔
709

710
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
814,414✔
711
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
712
    TAOS_CHECK_EXIT(code);
×
713
  }
714

715
_exit:
814,414✔
716
  mndReleaseDnode(pMnode, pDnode);
814,414✔
717
  if (code != 0) {
814,414✔
718
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
719
  }
720
  mndTransDrop(pTrans);
814,414✔
721
  sdbFreeRaw(pCommitRaw);
814,414✔
722
  TAOS_RETURN(code);
814,414✔
723
}
724

725
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
56,303,953✔
726
  SMnode    *pMnode = pReq->info.node;
56,303,953✔
727
  SStatusReq statusReq = {0};
56,303,953✔
728
  SDnodeObj *pDnode = NULL;
56,303,953✔
729
  int32_t    code = -1;
56,303,953✔
730

731
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
56,303,953✔
732

733
  int64_t clusterid = mndGetClusterId(pMnode);
56,303,953✔
734
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
56,303,953✔
735
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
736
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
737
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
738
    goto _OVER;
×
739
  }
740

741
  if (statusReq.dnodeId == 0) {
56,303,953✔
742
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
1,340,535✔
743
    if (pDnode == NULL) {
1,340,535✔
744
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
624,380✔
745
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
624,380✔
746
      if (terrno != 0) code = terrno;
624,380✔
747
      goto _OVER;
624,380✔
748
    }
749
  } else {
750
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
54,963,418✔
751
    if (pDnode == NULL) {
54,963,418✔
752
      int32_t err = terrno;
269,711✔
753
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
269,711✔
754
      if (pDnode != NULL) {
269,711✔
755
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
2,224✔
756
        terrno = err;
2,224✔
757
        goto _OVER;
2,224✔
758
      }
759

760
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
267,487✔
761
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
267,487✔
762
        terrno = err;
84,485✔
763
        goto _OVER;
84,485✔
764
      } else {
765
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
183,002✔
766
        if (pDnode == NULL) goto _OVER;
183,002✔
767
      }
768
    }
769
  }
770

771
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
55,592,864✔
772
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
55,592,864✔
773

774
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
55,592,864✔
775
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
55,592,864✔
776
  int64_t curMs = taosGetTimestampMs();
55,592,864✔
777
  bool    online = mndIsDnodeOnline(pDnode, curMs);
55,592,864✔
778
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
55,592,864✔
779
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
55,592,864✔
780
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
55,592,864✔
781
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
55,592,864✔
782
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
55,592,864✔
783
  bool    analVerChanged = (analVer != statusReq.analVer);
55,592,864✔
784
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
54,805,423✔
785
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
52,827,414✔
786
                   encryptKeyChanged || enableWhiteListChanged;
110,398,287✔
787
  const STraceId *trace = &pReq->info.traceId;
55,592,864✔
788
  char            timestamp[TD_TIME_STR_LEN] = {0};
55,592,864✔
789
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
55,592,864✔
790
  mGTrace(
55,592,864✔
791
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
792
      "timestamp:%s",
793
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
794

795
  if (reboot) {
55,592,864✔
796
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
825,296✔
797
  }
798

799
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
55,592,864✔
800
  if (labs(delta) >= tsTimestampDeltaLimit) {
55,592,864✔
NEW
801
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
NEW
802
    code = terrno;
×
803

NEW
804
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
NEW
805
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
806
           tstrerror(code), tsTimestampDeltaLimit);
NEW
807
    goto _OVER;
×
808
  }
809
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
205,218,185✔
810
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
149,625,321✔
811

812
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
149,625,321✔
813
    if (pVgroup != NULL) {
149,625,321✔
814
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
148,390,689✔
815
        pVgroup->cacheUsage = pVload->cacheUsage;
105,652,820✔
816
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
105,652,820✔
817
        pVgroup->numOfTables = pVload->numOfTables;
105,652,820✔
818
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
105,652,820✔
819
        pVgroup->totalStorage = pVload->totalStorage;
105,652,820✔
820
        pVgroup->compStorage = pVload->compStorage;
105,652,820✔
821
        pVgroup->pointsWritten = pVload->pointsWritten;
105,652,820✔
822
      }
823
      bool stateChanged = false;
148,390,689✔
824
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
209,439,745✔
825
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
209,362,180✔
826
        if (pGid->dnodeId == statusReq.dnodeId) {
209,362,180✔
827
          if (pVload->startTimeMs == 0) {
148,313,124✔
828
            pVload->startTimeMs = statusReq.rebootTime;
×
829
          }
830
          if (pVload->roleTimeMs == 0) {
148,313,124✔
831
            pVload->roleTimeMs = statusReq.rebootTime;
×
832
          }
833
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
148,313,124✔
834
          break;
148,313,124✔
835
        }
836
      }
837
      if (stateChanged) {
148,390,689✔
838
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,790,000✔
839
        if (pDb != NULL && pDb->stateTs != curMs) {
5,790,000✔
840
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,968,661✔
841
                pDb->stateTs, curMs);
842
          pDb->stateTs = curMs;
3,968,661✔
843
        }
844
        mndReleaseDb(pMnode, pDb);
5,790,000✔
845
      }
846
    }
847

848
    mndReleaseVgroup(pMnode, pVgroup);
149,625,321✔
849
  }
850

851
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
55,592,864✔
852
  if (pObj != NULL) {
55,592,864✔
853
    if (statusReq.mload.roleTimeMs == 0) {
33,408,614✔
854
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
285,041✔
855
    }
856
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
33,408,614✔
857
    mndReleaseMnode(pMnode, pObj);
33,408,614✔
858
  }
859

860
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
55,592,864✔
861
  if (pQnode != NULL) {
55,592,864✔
862
    pQnode->load = statusReq.qload;
417,313✔
863
    mndReleaseQnode(pMnode, pQnode);
417,313✔
864
  }
865

866
  if (needCheck) {
55,592,864✔
867
    if (statusReq.sver != tsVersion) {
2,766,046✔
868
      if (pDnode != NULL) {
×
869
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
870
      }
871
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
872
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
873
      goto _OVER;
×
874
    }
875

876
    if (statusReq.dnodeId == 0) {
2,766,046✔
877
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
716,155✔
878
    } else {
879
      if (statusReq.clusterId != pMnode->clusterId) {
2,049,891✔
880
        if (pDnode != NULL) {
×
881
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
882
        }
883
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
884
               pMnode->clusterId);
885
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
886
        goto _OVER;
×
887
      }
888
    }
889

890
    // Verify whether the cluster parameters are consistent when status change from offline to ready
891
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
2,766,046✔
892
    if (pDnode->offlineReason != 0) {
2,766,046✔
893
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
7,784✔
894
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
7,784✔
895
      goto _OVER;
7,784✔
896
    }
897

898
    if (!online) {
2,758,262✔
899
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
779,657✔
900
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
901
    } else {
902
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
1,978,605✔
903
            statusReq.dnodeVer, dnodeVer, reboot);
904
    }
905

906
    pDnode->rebootTime = statusReq.rebootTime;
2,758,262✔
907
    pDnode->numOfCores = statusReq.numOfCores;
2,758,262✔
908
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,758,262✔
909
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,758,262✔
910
    pDnode->memAvail = statusReq.memAvail;
2,758,262✔
911
    pDnode->memTotal = statusReq.memTotal;
2,758,262✔
912
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,758,262✔
913
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,758,262✔
914
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,758,262✔
915
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
814,414✔
916
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
814,414✔
917
        goto _OVER;
×
918
      }
919
    }
920

921
    SStatusRsp statusRsp = {0};
2,758,262✔
922
    statusRsp.statusSeq++;
2,758,262✔
923
    statusRsp.analVer = analVer;
2,758,262✔
924
    statusRsp.dnodeVer = dnodeVer;
2,758,262✔
925
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,758,262✔
926
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,758,262✔
927
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,758,262✔
928
    if (statusRsp.pDnodeEps == NULL) {
2,758,262✔
929
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
930
      goto _OVER;
×
931
    }
932

933
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,758,262✔
934
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,758,262✔
935
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,758,262✔
936

937
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,758,262✔
938
    void   *pHead = rpcMallocCont(contLen);
2,758,262✔
939
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,758,262✔
940
    taosArrayDestroy(statusRsp.pDnodeEps);
2,758,262✔
941
    if (contLen < 0) {
2,758,262✔
942
      code = contLen;
×
943
      goto _OVER;
×
944
    }
945

946
    pReq->info.rspLen = contLen;
2,758,262✔
947
    pReq->info.rsp = pHead;
2,758,262✔
948
  }
949

950
  pDnode->accessTimes++;
55,585,080✔
951
  pDnode->lastAccessTime = curMs;
55,585,080✔
952
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
55,585,080✔
953
    pDnode->offlineReason = DND_REASON_ONLINE;
×
954
  }
955
  code = 0;
55,585,080✔
956

957
_OVER:
56,303,953✔
958
  mndReleaseDnode(pMnode, pDnode);
56,303,953✔
959
  taosArrayDestroy(statusReq.pVloads);
56,303,953✔
960
  return mndUpdClusterInfo(pReq);
56,303,953✔
961
}
962

963
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
964
  SMnode    *pMnode = pReq->info.node;
×
965
  SNotifyReq notifyReq = {0};
×
966
  int32_t    code = 0;
×
967

968
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
969
    terrno = code;
×
970
    goto _OVER;
×
971
  }
972

973
  int64_t clusterid = mndGetClusterId(pMnode);
×
974
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
975
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
976
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
977
          notifyReq.clusterId, clusterid, tstrerror(code));
978
    goto _OVER;
×
979
  }
980

981
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
982
  for (int32_t v = 0; v < nVgroup; ++v) {
×
983
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
984

985
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
986
    if (pVgroup != NULL) {
×
987
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
988
      mndReleaseVgroup(pMnode, pVgroup);
×
989
    }
990
  }
991
  code = mndUpdClusterInfo(pReq);
×
992
_OVER:
×
993
  tFreeSNotifyReq(&notifyReq);
×
994
  return code;
×
995
}
996

997
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
188,710✔
998
  int32_t  code = -1;
188,710✔
999
  SSdbRaw *pRaw = NULL;
188,710✔
1000
  STrans  *pTrans = NULL;
188,710✔
1001

1002
  SDnodeObj dnodeObj = {0};
188,710✔
1003
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
188,710✔
1004
  dnodeObj.createdTime = taosGetTimestampMs();
188,710✔
1005
  dnodeObj.updateTime = dnodeObj.createdTime;
188,710✔
1006
  dnodeObj.port = pCreate->port;
188,710✔
1007
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
188,710✔
1008
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
188,710✔
1009

1010
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
188,710✔
1011
  if (pTrans == NULL) {
188,710✔
1012
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1013
    if (terrno != 0) code = terrno;
×
1014
    goto _OVER;
×
1015
  }
1016
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
188,710✔
1017
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
188,710✔
1018

1019
  pRaw = mndDnodeActionEncode(&dnodeObj);
188,710✔
1020
  if (pRaw == NULL) {
188,710✔
1021
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1022
    if (terrno != 0) code = terrno;
×
1023
    goto _OVER;
×
1024
  }
1025
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
188,710✔
1026
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
188,710✔
1027
  pRaw = NULL;
188,710✔
1028

1029
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
188,710✔
1030
  code = 0;
188,710✔
1031

1032
_OVER:
188,710✔
1033
  mndTransDrop(pTrans);
188,710✔
1034
  sdbFreeRaw(pRaw);
188,710✔
1035
  return code;
188,710✔
1036
}
1037

1038
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
77,284✔
1039
  SMnode       *pMnode = pReq->info.node;
77,284✔
1040
  SSdb         *pSdb = pMnode->pSdb;
77,284✔
1041
  SDnodeObj    *pObj = NULL;
77,284✔
1042
  void         *pIter = NULL;
77,284✔
1043
  SDnodeListRsp rsp = {0};
77,284✔
1044
  int32_t       code = -1;
77,284✔
1045

1046
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
77,284✔
1047
  if (NULL == rsp.dnodeList) {
77,284✔
1048
    mError("failed to alloc epSet while process dnode list req");
×
1049
    code = terrno;
×
1050
    goto _OVER;
×
1051
  }
1052

1053
  while (1) {
173,838✔
1054
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
251,122✔
1055
    if (pIter == NULL) break;
251,122✔
1056

1057
    SDNodeAddr dnodeAddr = {0};
173,838✔
1058
    dnodeAddr.nodeId = pObj->id;
173,838✔
1059
    dnodeAddr.epSet.numOfEps = 1;
173,838✔
1060
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
173,838✔
1061
    dnodeAddr.epSet.eps[0].port = pObj->port;
173,838✔
1062

1063
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
347,676✔
1064
      if (terrno != 0) code = terrno;
×
1065
      sdbRelease(pSdb, pObj);
×
1066
      sdbCancelFetch(pSdb, pIter);
×
1067
      goto _OVER;
×
1068
    }
1069

1070
    sdbRelease(pSdb, pObj);
173,838✔
1071
  }
1072

1073
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
77,284✔
1074
  void   *pRsp = rpcMallocCont(rspLen);
77,284✔
1075
  if (pRsp == NULL) {
77,284✔
1076
    code = terrno;
×
1077
    goto _OVER;
×
1078
  }
1079

1080
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
77,284✔
1081
    code = rspLen;
×
1082
    goto _OVER;
×
1083
  }
1084

1085
  pReq->info.rspLen = rspLen;
77,284✔
1086
  pReq->info.rsp = pRsp;
77,284✔
1087
  code = 0;
77,284✔
1088

1089
_OVER:
77,284✔
1090

1091
  if (code != 0) {
77,284✔
1092
    mError("failed to get dnode list since %s", tstrerror(code));
×
1093
  }
1094

1095
  tFreeSDnodeListRsp(&rsp);
77,284✔
1096

1097
  TAOS_RETURN(code);
77,284✔
1098
}
1099

1100
void getSlowLogScopeString(int32_t scope, char *result) {
2,492✔
1101
  if (scope == SLOW_LOG_TYPE_NULL) {
2,492✔
1102
    (void)strncat(result, "NONE", 64);
×
1103
    return;
×
1104
  }
1105
  while (scope > 0) {
4,984✔
1106
    if (scope & SLOW_LOG_TYPE_QUERY) {
2,492✔
1107
      (void)strncat(result, "QUERY", 64);
2,492✔
1108
      scope &= ~SLOW_LOG_TYPE_QUERY;
2,492✔
1109
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1110
      (void)strncat(result, "INSERT", 64);
×
1111
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1112
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1113
      (void)strncat(result, "OTHERS", 64);
×
1114
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1115
    } else {
1116
      (void)printf("invalid slow log scope:%d", scope);
×
1117
      return;
×
1118
    }
1119

1120
    if (scope > 0) {
2,492✔
1121
      (void)strncat(result, "|", 64);
×
1122
    }
1123
  }
1124
}
1125

1126
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
189,066✔
1127
  SMnode         *pMnode = pReq->info.node;
189,066✔
1128
  int32_t         code = -1;
189,066✔
1129
  SDnodeObj      *pDnode = NULL;
189,066✔
1130
  SCreateDnodeReq createReq = {0};
189,066✔
1131
  int32_t         lino = 0;
189,066✔
1132

1133
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
189,066✔
1134
    goto _OVER;
×
1135
  }
1136

1137
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
189,066✔
1138
  TAOS_CHECK_GOTO(code, &lino, _OVER);
189,066✔
1139

1140
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
189,066✔
1141
  code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE);
189,066✔
1142
  TAOS_CHECK_GOTO(code, &lino, _OVER);
189,066✔
1143

1144
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
188,710✔
1145
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1146
    goto _OVER;
×
1147
  }
1148
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1149
  // if (code != 0) {
1150
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1151
  //          tstrerror(code));
1152
  //   goto _OVER;
1153
  // }
1154

1155
  char ep[TSDB_EP_LEN];
188,710✔
1156
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
188,710✔
1157
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
188,710✔
1158
  if (pDnode != NULL) {
188,710✔
1159
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1160
    goto _OVER;
×
1161
  }
1162

1163
  code = mndCreateDnode(pMnode, pReq, &createReq);
188,710✔
1164
  if (code == 0) {
188,710✔
1165
    code = TSDB_CODE_ACTION_IN_PROGRESS;
188,710✔
1166
    tsGrantHBInterval = 5;
188,710✔
1167
  }
1168

1169
  char obj[200] = {0};
188,710✔
1170
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
188,710✔
1171

1172
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
188,710✔
1173

1174
_OVER:
189,066✔
1175
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
189,066✔
1176
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
356✔
1177
  }
1178

1179
  mndReleaseDnode(pMnode, pDnode);
189,066✔
1180
  tFreeSCreateDnodeReq(&createReq);
189,066✔
1181
  TAOS_RETURN(code);
189,066✔
1182
}
1183

1184
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1185

1186
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
6,437✔
1187

1188
#ifndef TD_ENTERPRISE
1189
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1190
#endif
1191

1192
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
12,915✔
1193
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1194
  int32_t  code = -1;
12,915✔
1195
  SSdbRaw *pRaw = NULL;
12,915✔
1196
  STrans  *pTrans = NULL;
12,915✔
1197
  int32_t  lino = 0;
12,915✔
1198

1199
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
12,915✔
1200
  if (pTrans == NULL) {
12,915✔
1201
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1202
    if (terrno != 0) code = terrno;
×
1203
    goto _OVER;
×
1204
  }
1205
  mndTransSetGroupParallel(pTrans);
12,915✔
1206
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
12,915✔
1207
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
12,915✔
1208

1209
  pRaw = mndDnodeActionEncode(pDnode);
12,915✔
1210
  if (pRaw == NULL) {
12,915✔
1211
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1212
    if (terrno != 0) code = terrno;
×
1213
    goto _OVER;
×
1214
  }
1215
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
12,915✔
1216
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
12,915✔
1217
  pRaw = NULL;
12,915✔
1218

1219
  pRaw = mndDnodeActionEncode(pDnode);
12,915✔
1220
  if (pRaw == NULL) {
12,915✔
1221
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1222
    if (terrno != 0) code = terrno;
×
1223
    goto _OVER;
×
1224
  }
1225
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
12,915✔
1226
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
12,915✔
1227
  pRaw = NULL;
12,915✔
1228

1229
  if (pSObj != NULL) {
12,915✔
1230
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1,135✔
1231
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
1,135✔
1232
  }
1233

1234
  if (pMObj != NULL) {
12,915✔
1235
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
629✔
1236
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
629✔
1237
  }
1238

1239
  if (pQObj != NULL) {
12,915✔
1240
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
346✔
1241
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
346✔
1242
  }
1243

1244
  if (pBObj != NULL) {
12,915✔
1245
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1,299✔
1246
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
1,299✔
1247
  }
1248

1249
  if (numOfVnodes > 0) {
11,616✔
1250
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
8,419✔
1251
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
8,419✔
1252
  }
1253

1254
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
11,616✔
1255

1256
  code = 0;
11,616✔
1257

1258
_OVER:
12,915✔
1259
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
12,915✔
1260
  mndTransDrop(pTrans);
12,915✔
1261
  sdbFreeRaw(pRaw);
12,915✔
1262
  TAOS_RETURN(code);
12,915✔
1263
}
1264

1265
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1266
  bool       isEmpty = false;
×
1267
  SMnodeObj *pMObj = NULL;
×
1268
  SQnodeObj *pQObj = NULL;
×
1269
  SSnodeObj *pSObj = NULL;
×
1270

1271
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1272
  if (pQObj) goto _OVER;
×
1273

1274
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1275
  if (pSObj) goto _OVER;
×
1276

1277
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1278
  if (pMObj) goto _OVER;
×
1279

1280
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1281
  if (numOfVnodes > 0) goto _OVER;
×
1282

1283
  isEmpty = true;
×
1284
_OVER:
×
1285
  mndReleaseMnode(pMnode, pMObj);
×
1286
  mndReleaseQnode(pMnode, pQObj);
×
1287
  mndReleaseSnode(pMnode, pSObj);
×
1288
  return isEmpty;
×
1289
}
1290

1291
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
14,895✔
1292
  SMnode       *pMnode = pReq->info.node;
14,895✔
1293
  int32_t       code = -1;
14,895✔
1294
  SDnodeObj    *pDnode = NULL;
14,895✔
1295
  SMnodeObj    *pMObj = NULL;
14,895✔
1296
  SQnodeObj    *pQObj = NULL;
14,895✔
1297
  SSnodeObj    *pSObj = NULL;
14,895✔
1298
  SBnodeObj    *pBObj = NULL;
14,895✔
1299
  SDropDnodeReq dropReq = {0};
14,895✔
1300

1301
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
14,895✔
1302

1303
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
14,895✔
1304
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1305
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
14,895✔
1306

1307
  bool force = dropReq.force;
14,539✔
1308
  if (dropReq.unsafe) {
14,539✔
1309
    force = true;
×
1310
  }
1311

1312
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
14,539✔
1313
  if (pDnode == NULL) {
14,539✔
1314
    int32_t err = terrno;
×
1315
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1316
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1317
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1318
    if (pDnode == NULL) {
×
1319
      code = err;
×
1320
      goto _OVER;
×
1321
    }
1322
  }
1323

1324
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
14,539✔
1325
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
14,539✔
1326
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
14,539✔
1327
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
14,539✔
1328
  if (pMObj != NULL) {
14,539✔
1329
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
2,253✔
1330
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
995✔
1331
      goto _OVER;
995✔
1332
    }
1333
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
1,258✔
1334
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
629✔
1335
      goto _OVER;
629✔
1336
    }
1337
  }
1338

1339
#ifdef USE_MOUNT
1340
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
12,915✔
1341
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1342
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1343
    goto _OVER;
×
1344
  }
1345
#endif
1346

1347
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
12,915✔
1348
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
12,915✔
1349

1350
  if (isonline && force) {
12,915✔
1351
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1352
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1353
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1354
    goto _OVER;
×
1355
  }
1356

1357
  mError("vnode num:%d", numOfVnodes);
12,915✔
1358

1359
  bool    vnodeOffline = false;
12,915✔
1360
  void   *pIter = NULL;
12,915✔
1361
  int32_t vgId = -1;
12,915✔
1362
  while (1) {
24,419✔
1363
    SVgObj *pVgroup = NULL;
37,334✔
1364
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
37,334✔
1365
    if (pIter == NULL) break;
37,334✔
1366

1367
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
74,340✔
1368
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
49,921✔
1369
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
49,921✔
1370
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
17,076✔
1371
          vgId = pVgroup->vgId;
×
1372
          vnodeOffline = true;
×
1373
          break;
×
1374
        }
1375
      }
1376
    }
1377

1378
    sdbRelease(pMnode->pSdb, pVgroup);
24,419✔
1379

1380
    if (vnodeOffline) {
24,419✔
1381
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1382
      break;
×
1383
    }
1384
  }
1385

1386
  if (vnodeOffline && !force) {
12,915✔
1387
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1388
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1389
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1390
    goto _OVER;
×
1391
  }
1392

1393
  if (!isonline && !force) {
12,915✔
1394
    code = TSDB_CODE_DNODE_OFFLINE;
×
1395
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
×
1396
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1397
    goto _OVER;
×
1398
  }
1399

1400
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
12,915✔
1401
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
12,915✔
1402

1403
  char obj1[30] = {0};
12,915✔
1404
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
12,915✔
1405

1406
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
12,915✔
1407

1408
_OVER:
14,895✔
1409
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
14,895✔
1410
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
3,279✔
1411
  }
1412

1413
  mndReleaseDnode(pMnode, pDnode);
14,895✔
1414
  mndReleaseMnode(pMnode, pMObj);
14,895✔
1415
  mndReleaseQnode(pMnode, pQObj);
14,895✔
1416
  mndReleaseBnode(pMnode, pBObj);
14,895✔
1417
  mndReleaseSnode(pMnode, pSObj);
14,895✔
1418
  tFreeSDropDnodeReq(&dropReq);
14,895✔
1419
  TAOS_RETURN(code);
14,895✔
1420
}
1421

1422
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1,244✔
1423
  int32_t code = 0;
1,244✔
1424
  SMnode *pMnode = pReq->info.node;
1,244✔
1425
  SSdb   *pSdb = pMnode->pSdb;
1,244✔
1426
  void   *pIter = NULL;
1,244✔
1427
  int8_t  encrypting = 0;
1,244✔
1428

1429
  const STraceId *trace = &pReq->info.traceId;
1,244✔
1430

1431
  int32_t klen = strlen(pDcfgReq->value);
1,244✔
1432
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1,244✔
1433
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1434
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1435
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1436
    goto _exit;
×
1437
  }
1438

1439
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1,244✔
1440
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1441
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1442
    goto _exit;
×
1443
  }
1444

1445
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1,244✔
1446
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1447
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1448
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1449
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1450
    goto _exit;
×
1451
  }
1452

1453
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1,244✔
1454
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1,244✔
1455
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1,244✔
1456

1457
  while (1) {
2,786✔
1458
    SDnodeObj *pDnode = NULL;
4,030✔
1459
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
4,030✔
1460
    if (pIter == NULL) break;
4,030✔
1461
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
2,786✔
1462
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1463
             offlineReason[pDnode->offlineReason]);
1464
      sdbRelease(pSdb, pDnode);
×
1465
      continue;
×
1466
    }
1467

1468
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
2,786✔
1469
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
2,786✔
1470
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
2,786✔
1471
      void   *pBuf = rpcMallocCont(bufLen);
2,786✔
1472

1473
      if (pBuf != NULL) {
2,786✔
1474
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
2,786✔
1475
          code = bufLen;
×
1476
          sdbRelease(pSdb, pDnode);
×
1477
          goto _exit;
×
1478
        }
1479
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
2,786✔
1480
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
2,786✔
1481
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
2,786✔
1482
        }
1483
      }
1484
    }
1485

1486
    sdbRelease(pSdb, pDnode);
2,786✔
1487
  }
1488

1489
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1,244✔
1490
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1491
  }
1492

1493
_exit:
1,244✔
1494
  if (code != 0) {
1,244✔
1495
    if (terrno == 0) terrno = code;
×
1496
  }
1497
  return code;
1,244✔
1498
}
1499

1500
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1,244✔
1501
  int32_t code = 0;
1,244✔
1502

1503
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1504
  SMnode       *pMnode = pReq->info.node;
1,244✔
1505
  SMCfgDnodeReq cfgReq = {0};
1,244✔
1506
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1,244✔
1507

1508
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1,244✔
1509
    tFreeSMCfgDnodeReq(&cfgReq);
×
1510
    TAOS_RETURN(code);
×
1511
  }
1512
  const STraceId *trace = &pReq->info.traceId;
1,244✔
1513
  SDCfgDnodeReq   dcfgReq = {0};
1,244✔
1514
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1,244✔
1515
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
1,244✔
1516
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
1,244✔
1517
    tFreeSMCfgDnodeReq(&cfgReq);
1,244✔
1518
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1,244✔
1519
  } else {
1520
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1521
    tFreeSMCfgDnodeReq(&cfgReq);
×
1522
    TAOS_RETURN(code);
×
1523
  }
1524

1525
#else
1526
  TAOS_RETURN(code);
1527
#endif
1528
}
1529

1530
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
2,786✔
1531
  SMnode *pMnode = pRsp->info.node;
2,786✔
1532
  int16_t nSuccess = 0;
2,786✔
1533
  int16_t nFailed = 0;
2,786✔
1534

1535
  if (0 == pRsp->code) {
2,786✔
1536
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
2,786✔
1537
  } else {
1538
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1539
  }
1540

1541
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
2,786✔
1542
  bool    finished = nSuccess + nFailed >= nReq;
2,786✔
1543

1544
  if (finished) {
2,786✔
1545
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1,244✔
1546
  }
1547

1548
  const STraceId *trace = &pRsp->info.traceId;
2,786✔
1549
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
2,786✔
1550
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1551

1552
  return 0;
2,786✔
1553
}
1554

1555
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
2,492✔
1556
  SMnode *pMnode = pReq->info.node;
2,492✔
1557
  int32_t totalRows = 0;
2,492✔
1558
  int32_t numOfRows = 0;
2,492✔
1559
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
2,492✔
1560
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
2,492✔
1561
  char   *pWrite = NULL;
2,492✔
1562
  int32_t cols = 0;
2,492✔
1563
  int32_t code = 0;
2,492✔
1564
  int32_t lino = 0;
2,492✔
1565

1566
  cfgOpts[totalRows] = "statusIntervalMs";
2,492✔
1567
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
2,492✔
1568
  totalRows++;
2,492✔
1569

1570
  cfgOpts[totalRows] = "timezone";
2,492✔
1571
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
2,492✔
1572
  totalRows++;
2,492✔
1573

1574
  cfgOpts[totalRows] = "locale";
2,492✔
1575
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
2,492✔
1576
  totalRows++;
2,492✔
1577

1578
  cfgOpts[totalRows] = "charset";
2,492✔
1579
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
2,492✔
1580
  totalRows++;
2,492✔
1581

1582
  cfgOpts[totalRows] = "monitor";
2,492✔
1583
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
2,492✔
1584
  totalRows++;
2,492✔
1585

1586
  cfgOpts[totalRows] = "monitorInterval";
2,492✔
1587
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
2,492✔
1588
  totalRows++;
2,492✔
1589

1590
  cfgOpts[totalRows] = "slowLogThreshold";
2,492✔
1591
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
2,492✔
1592
  totalRows++;
2,492✔
1593

1594
  cfgOpts[totalRows] = "slowLogMaxLen";
2,492✔
1595
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
2,492✔
1596
  totalRows++;
2,492✔
1597

1598
  char scopeStr[64] = {0};
2,492✔
1599
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
2,492✔
1600
  cfgOpts[totalRows] = "slowLogScope";
2,492✔
1601
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
2,492✔
1602
  totalRows++;
2,492✔
1603

1604
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
2,492✔
1605
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
2,492✔
1606

1607
  for (int32_t i = 0; i < totalRows; i++) {
24,920✔
1608
    cols = 0;
22,428✔
1609

1610
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
22,428✔
1611
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,428✔
1612
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
22,428✔
1613

1614
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
22,428✔
1615
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,428✔
1616
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
22,428✔
1617

1618
    numOfRows++;
22,428✔
1619
  }
1620

1621
_OVER:
2,492✔
1622
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
2,492✔
1623
  pShow->numOfRows += numOfRows;
2,492✔
1624
  return numOfRows;
2,492✔
1625
}
1626

1627
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1628

1629
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
893,431✔
1630
  SMnode    *pMnode = pReq->info.node;
893,431✔
1631
  SSdb      *pSdb = pMnode->pSdb;
893,431✔
1632
  int32_t    numOfRows = 0;
893,431✔
1633
  int32_t    cols = 0;
893,431✔
1634
  ESdbStatus objStatus = 0;
893,431✔
1635
  SDnodeObj *pDnode = NULL;
893,431✔
1636
  int64_t    curMs = taosGetTimestampMs();
893,431✔
1637
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
891,755✔
1638
  int32_t    code = 0;
893,431✔
1639
  int32_t    lino = 0;
893,431✔
1640

1641
  while (numOfRows < rows) {
3,355,832✔
1642
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
3,355,832✔
1643
    if (pShow->pIter == NULL) break;
3,355,832✔
1644
    bool online = mndIsDnodeOnline(pDnode, curMs);
2,462,401✔
1645

1646
    cols = 0;
2,462,401✔
1647

1648
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,462,401✔
1649
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
2,462,401✔
1650

1651
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
2,462,401✔
1652

1653
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,462,401✔
1654
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,462,401✔
1655

1656
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,462,401✔
1657
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
2,462,401✔
1658
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
2,462,401✔
1659

1660
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,462,401✔
1661
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
2,462,401✔
1662
                        &lino, _OVER);
1663

1664
    const char *status = "ready";
2,462,401✔
1665
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
2,462,401✔
1666
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
2,462,401✔
1667
    if (!online) {
2,462,401✔
1668
      if (objStatus == SDB_STATUS_CREATING)
270,839✔
1669
        status = "creating*";
×
1670
      else if (objStatus == SDB_STATUS_DROPPING)
270,839✔
1671
        status = "dropping*";
×
1672
      else
1673
        status = "offline";
270,839✔
1674
    }
1675

1676
    STR_TO_VARSTR(buf, status);
2,462,401✔
1677
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,462,401✔
1678
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,462,401✔
1679

1680
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,462,401✔
1681
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
2,462,401✔
1682
                        _OVER);
1683

1684
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,462,401✔
1685
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
2,462,401✔
1686
                        _OVER);
1687

1688
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
2,462,401✔
1689
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
2,462,401✔
1690

1691
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,462,401✔
1692
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
2,462,401✔
1693
    taosMemoryFreeClear(b);
2,462,401✔
1694

1695
#ifdef TD_ENTERPRISE
1696
    STR_TO_VARSTR(buf, pDnode->machineId);
2,462,401✔
1697
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,462,401✔
1698
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,462,401✔
1699
#endif
1700

1701
    numOfRows++;
2,462,401✔
1702
    sdbRelease(pSdb, pDnode);
2,462,401✔
1703
  }
1704

1705
_OVER:
891,755✔
1706
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
893,431✔
1707

1708
  pShow->numOfRows += numOfRows;
893,431✔
1709
  return numOfRows;
893,431✔
1710
}
1711

1712
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1713
  SSdb *pSdb = pMnode->pSdb;
×
1714
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1715
}
×
1716

1717
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1718
  SDnodeObj *pObj = NULL;
×
1719
  void      *pIter = NULL;
×
1720
  SSdb      *pSdb = pMnode->pSdb;
×
1721
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1722
  while (1) {
×
1723
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1724
    if (pIter == NULL) break;
×
1725

1726
    char *fqdn = taosStrdup(pObj->fqdn);
×
1727
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1728
      mError("failed to fqdn into array, but continue at this time");
×
1729
    }
1730
    sdbRelease(pSdb, pObj);
×
1731
  }
1732
  return fqdns;
×
1733
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc