• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3519

05 Nov 2024 11:19AM UTC coverage: 57.706% (+8.4%) from 49.32%
#3519

push

travis-ci

web-flow
Merge pull request #28652 from taosdata/fix/3_liaohj

refactor: always successfully put the retrieve msg

109445 of 245179 branches covered (44.64%)

Branch coverage included in aggregate %.

187435 of 269288 relevant lines covered (69.6%)

12869818.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.71
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndCluster.h"
21
#include "mndDb.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "taos_monitor.h"
31
#include "tjson.h"
32
#include "tmisce.h"
33
#include "tunit.h"
34

35
#define TSDB_DNODE_VER_NUMBER   2
36
#define TSDB_DNODE_RESERVE_SIZE 40
37

38
static const char *offlineReason[] = {
39
    "",
40
    "status msg timeout",
41
    "status not received",
42
    "version not match",
43
    "dnodeId not match",
44
    "clusterId not match",
45
    "statusInterval not match",
46
    "timezone not match",
47
    "locale not match",
48
    "charset not match",
49
    "ttlChangeOnWrite not match",
50
    "enableWhiteList not match",
51
    "encryptionKey not match",
52
    "monitor not match",
53
    "monitor switch not match",
54
    "monitor interval not match",
55
    "monitor slow log threshold not match",
56
    "monitor slow log sql max len not match",
57
    "monitor slow log scope not match",
58
    "unknown",
59
};
60

61
enum {
62
  DND_ACTIVE_CODE,
63
  DND_CONN_ACTIVE_CODE,
64
};
65

66
enum {
67
  DND_CREATE,
68
  DND_ADD,
69
  DND_DROP,
70
};
71

72
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
73
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
74
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
75
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
76
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
77
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
78
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
79
static int32_t  mndProcessShowVariablesReq(SRpcMsg *pReq);
80

81
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
82
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
90
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
91
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
92

93
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
94
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
95
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
96
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
97

98
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue);
99

100
#ifdef _GRANT
101
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
102
#else
103
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
104
#endif
105

106
int32_t mndInitDnode(SMnode *pMnode) {
909✔
107
  SSdbTable table = {
909✔
108
      .sdbType = SDB_DNODE,
109
      .keyType = SDB_KEY_INT32,
110
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
111
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
112
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
113
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
114
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
115
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
116
  };
117

118
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
909✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
909✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
909✔
121
  mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
909✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
909✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
909✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
909✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
909✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
909✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
909✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
909✔
129
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
909✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
909✔
131

132
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
909✔
133
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
909✔
134
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
909✔
135
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
909✔
136

137
  return sdbSetTable(pMnode->pSdb, table);
909✔
138
}
139

140
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
141
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
142
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
143
void          mndCleanupDnode(SMnode *pMnode) {}
908✔
144

145
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
597✔
146
  int32_t  code = -1;
597✔
147
  SSdbRaw *pRaw = NULL;
597✔
148
  STrans  *pTrans = NULL;
597✔
149

150
  SDnodeObj dnodeObj = {0};
597✔
151
  dnodeObj.id = 1;
597✔
152
  dnodeObj.createdTime = taosGetTimestampMs();
597✔
153
  dnodeObj.updateTime = dnodeObj.createdTime;
597✔
154
  dnodeObj.port = tsServerPort;
597✔
155
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
597✔
156
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
597✔
157
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
597✔
158
  char *machineId = NULL;
597✔
159
  code = tGetMachineId(&machineId);
597✔
160
  if (machineId) {
597!
161
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
597✔
162
    taosMemoryFreeClear(machineId);
597!
163
  } else {
164
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
165
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
166
    goto _OVER;
×
167
#endif
168
  }
169

170
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
597✔
171
  if (pTrans == NULL) {
597!
172
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
173
    if (terrno != 0) code = terrno;
×
174
    goto _OVER;
×
175
  }
176
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
597!
177

178
  pRaw = mndDnodeActionEncode(&dnodeObj);
597✔
179
  if (pRaw == NULL) {
597!
180
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
181
    if (terrno != 0) code = terrno;
×
182
    goto _OVER;
×
183
  }
184
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
597!
185
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
597!
186
  pRaw = NULL;
597✔
187

188
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
597!
189
  code = 0;
597✔
190
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
597✔
191
                                   1);  // TODO: check the return value
192

193
_OVER:
597✔
194
  mndTransDrop(pTrans);
597✔
195
  sdbFreeRaw(pRaw);
597✔
196
  return code;
597✔
197
}
198

199
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
3,664✔
200
  int32_t code = 0;
3,664✔
201
  int32_t lino = 0;
3,664✔
202
  terrno = TSDB_CODE_OUT_OF_MEMORY;
3,664✔
203

204
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
3,664✔
205
  if (pRaw == NULL) goto _OVER;
3,664!
206

207
  int32_t dataPos = 0;
3,664✔
208
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
3,664!
209
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
3,664!
210
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
3,664!
211
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
3,664!
212
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
3,664!
213
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
3,664!
214
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
3,664!
215
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
3,664!
216
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
3,664!
217
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
3,664!
218

219
  terrno = 0;
3,664✔
220

221
_OVER:
3,664✔
222
  if (terrno != 0) {
3,664!
223
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
224
    sdbFreeRaw(pRaw);
×
225
    return NULL;
×
226
  }
227

228
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
3,664✔
229
  return pRaw;
3,664✔
230
}
231

232
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
2,488✔
233
  int32_t code = 0;
2,488✔
234
  int32_t lino = 0;
2,488✔
235
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,488✔
236
  SSdbRow   *pRow = NULL;
2,488✔
237
  SDnodeObj *pDnode = NULL;
2,488✔
238

239
  int8_t sver = 0;
2,488✔
240
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
2,488!
241
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
2,488!
242
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
243
    goto _OVER;
×
244
  }
245

246
  pRow = sdbAllocRow(sizeof(SDnodeObj));
2,488✔
247
  if (pRow == NULL) goto _OVER;
2,488!
248

249
  pDnode = sdbGetRowObj(pRow);
2,488✔
250
  if (pDnode == NULL) goto _OVER;
2,488!
251

252
  int32_t dataPos = 0;
2,488✔
253
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
2,488!
254
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
2,488!
255
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
2,488!
256
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
2,488!
257
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,488!
258
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,488!
259
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,488!
260
  if (sver > 1) {
2,488!
261
    int16_t keyLen = 0;
2,488✔
262
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,488!
263
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,488!
264
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,488!
265
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,488!
266
  }
267

268
  terrno = 0;
2,488✔
269
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
2,488!
270
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
271
  }
272

273
_OVER:
2,488✔
274
  if (terrno != 0) {
2,488!
275
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
276
    taosMemoryFreeClear(pRow);
×
277
    return NULL;
×
278
  }
279

280
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
2,488✔
281
  return pRow;
2,488✔
282
}
283

284
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
1,860✔
285
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
1,860✔
286
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
1,860✔
287

288
  char ep[TSDB_EP_LEN] = {0};
1,860✔
289
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
1,860✔
290
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
1,860✔
291
  return 0;
1,860✔
292
}
293

294
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
2,483✔
295
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
2,483✔
296
  return 0;
2,483✔
297
}
298

299
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
605✔
300
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
605!
301
  pOld->updateTime = pNew->updateTime;
605✔
302
#ifdef TD_ENTERPRISE
303
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
605✔
304
#endif
305
  return 0;
605✔
306
}
307

308
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
7,577,766✔
309
  SSdb      *pSdb = pMnode->pSdb;
7,577,766✔
310
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
7,577,766✔
311
  if (pDnode == NULL) {
7,577,807✔
312
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
543✔
313
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
23✔
314
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
520!
315
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
316
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
520!
317
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
520✔
318
    } else {
319
      terrno = TSDB_CODE_APP_ERROR;
×
320
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
321
    }
322
  }
323

324
  return pDnode;
7,577,805✔
325
}
326

327
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
7,579,067✔
328
  SSdb *pSdb = pMnode->pSdb;
7,579,067✔
329
  sdbRelease(pSdb, pDnode);
7,579,067✔
330
}
7,579,094✔
331

332
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
12,247✔
333
  SEpSet epSet = {0};
12,247✔
334
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
12,247✔
335
  return epSet;
12,247✔
336
}
337

338
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
339
  SEpSet     epSet = {0};
×
340
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
341
  if (!pDnode) return epSet;
×
342

343
  epSet = mndGetDnodeEpset(pDnode);
×
344

345
  mndReleaseDnode(pMnode, pDnode);
×
346
  return epSet;
×
347
}
348

349
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,665✔
350
  SSdb *pSdb = pMnode->pSdb;
1,665✔
351

352
  void *pIter = NULL;
1,665✔
353
  while (1) {
2,817✔
354
    SDnodeObj *pDnode = NULL;
4,482✔
355
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
4,482✔
356
    if (pIter == NULL) break;
4,482✔
357

358
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
3,734✔
359
      sdbCancelFetch(pSdb, pIter);
917✔
360
      return pDnode;
917✔
361
    }
362

363
    sdbRelease(pSdb, pDnode);
2,817✔
364
  }
365

366
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
748✔
367
  return NULL;
748✔
368
}
369

370
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
296✔
371
  SSdb *pSdb = pMnode->pSdb;
296✔
372

373
  void *pIter = NULL;
296✔
374
  while (1) {
296✔
375
    SDnodeObj *pDnode = NULL;
592✔
376
    ESdbStatus objStatus = 0;
592✔
377
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
592✔
378
    if (pIter == NULL) break;
592!
379

380
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
592✔
381
      sdbCancelFetch(pSdb, pIter);
296✔
382
      return pDnode;
296✔
383
    }
384

385
    sdbRelease(pSdb, pDnode);
296✔
386
  }
387

388
  return NULL;
×
389
}
390

391
int32_t mndGetDnodeSize(SMnode *pMnode) {
524,780✔
392
  SSdb *pSdb = pMnode->pSdb;
524,780✔
393
  return sdbGetSize(pSdb, SDB_DNODE);
524,780✔
394
}
395

396
int32_t mndGetDbSize(SMnode *pMnode) {
×
397
  SSdb *pSdb = pMnode->pSdb;
×
398
  return sdbGetSize(pSdb, SDB_DB);
×
399
}
400

401
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
470,376✔
402
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
470,376✔
403
  if (interval > 5000 * (int64_t)tsStatusInterval) {
470,376✔
404
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
4,231✔
405
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
49✔
406
    }
407
    return false;
4,231✔
408
  }
409
  return true;
466,145✔
410
}
411

412
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,343✔
413
  SSdb *pSdb = pMnode->pSdb;
2,343✔
414

415
  int32_t numOfEps = 0;
2,343✔
416
  void   *pIter = NULL;
2,343✔
417
  while (1) {
8,784✔
418
    SDnodeObj *pDnode = NULL;
11,127✔
419
    ESdbStatus objStatus = 0;
11,127✔
420
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
11,127✔
421
    if (pIter == NULL) break;
11,127✔
422

423
    SDnodeEp dnodeEp = {0};
8,784✔
424
    dnodeEp.id = pDnode->id;
8,784✔
425
    dnodeEp.ep.port = pDnode->port;
8,784✔
426
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
8,784✔
427
    sdbRelease(pSdb, pDnode);
8,784✔
428

429
    dnodeEp.isMnode = 0;
8,784✔
430
    if (mndIsMnode(pMnode, pDnode->id)) {
8,784✔
431
      dnodeEp.isMnode = 1;
3,275✔
432
    }
433
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
8,784!
434
      mError("failed to put ep into array, but continue at this call");
×
435
    }
436
  }
437
}
2,343✔
438

439
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
30,755✔
440
  SSdb   *pSdb = pMnode->pSdb;
30,755✔
441
  int32_t code = 0;
30,755✔
442

443
  int32_t numOfEps = 0;
30,755✔
444
  void   *pIter = NULL;
30,755✔
445
  while (1) {
150,134✔
446
    SDnodeObj *pDnode = NULL;
180,889✔
447
    ESdbStatus objStatus = 0;
180,889✔
448
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
180,889✔
449
    if (pIter == NULL) break;
180,889✔
450

451
    SDnodeInfo dInfo;
452
    dInfo.id = pDnode->id;
150,134✔
453
    dInfo.ep.port = pDnode->port;
150,134✔
454
    dInfo.offlineReason = pDnode->offlineReason;
150,134✔
455
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
150,134✔
456
    sdbRelease(pSdb, pDnode);
150,134✔
457
    if (mndIsMnode(pMnode, pDnode->id)) {
150,134✔
458
      dInfo.isMnode = 1;
35,813✔
459
    } else {
460
      dInfo.isMnode = 0;
114,321✔
461
    }
462

463
    if(taosArrayPush(pDnodeInfo, &dInfo) == NULL){
150,134!
464
      code = terrno;
×
465
      sdbCancelFetch(pSdb, pIter);
×
466
      break;
×
467
    }
468
  }
469
  TAOS_RETURN(code);
30,755✔
470
}
471

472
#define CHECK_MONITOR_PARA(para,err) \
473
if (pCfg->monitorParas.para != para) { \
474
  mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
475
  terrno = err; \
476
  return err;\
477
}
478

479
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
2,343✔
480
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
2,343!
481
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
2,343!
482
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
2,343!
483
  CHECK_MONITOR_PARA(tsSlowLogThresholdTest, DND_REASON_STATUS_MONITOR_NOT_MATCH);
2,343!
484
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
2,343!
485
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
2,343!
486

487
  if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
2,343!
488
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
×
489
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
490
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
491
  }
492

493
  if (pCfg->statusInterval != tsStatusInterval) {
2,343!
494
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
495
           tsStatusInterval);
496
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
497
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
498
  }
499

500
  if ((0 != strcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
2,343!
501
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
502
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
503
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
504
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
505
  }
506

507
  if (0 != strcasecmp(pCfg->locale, tsLocale)) {
2,343!
508
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
509
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
510
    return DND_REASON_LOCALE_NOT_MATCH;
×
511
  }
512

513
  if (0 != strcasecmp(pCfg->charset, tsCharset)) {
2,343!
514
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
515
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
516
    return DND_REASON_CHARSET_NOT_MATCH;
×
517
  }
518

519
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
2,343!
520
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
521
           tsTtlChangeOnWrite);
522
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
523
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
524
  }
525
  int8_t enable = tsEnableWhiteList ? 1 : 0;
2,343✔
526
  if (pCfg->enableWhiteList != enable) {
2,343!
527
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
528
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
529
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
530
  }
531

532
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
2,343!
533
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
2,343!
534
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
535
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
536
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
537
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
538
  }
539

540
  return DND_REASON_ONLINE;
2,343✔
541
}
542

543
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
795,019✔
544
  bool stateChanged = false;
795,019✔
545
  bool roleChanged = pGid->syncState != pVload->syncState ||
2,383,513✔
546
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
1,584,584!
547
                     pGid->roleTimeMs != pVload->roleTimeMs;
789,565✔
548
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
795,019!
549
      pGid->startTimeMs != pVload->startTimeMs) {
789,199!
550
    mInfo(
5,820!
551
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
552
        "canRead:%d, dnode:%d",
553
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
554
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
555
    pGid->syncState = pVload->syncState;
5,820✔
556
    pGid->syncTerm = pVload->syncTerm;
5,820✔
557
    pGid->syncRestore = pVload->syncRestore;
5,820✔
558
    pGid->syncCanRead = pVload->syncCanRead;
5,820✔
559
    pGid->startTimeMs = pVload->startTimeMs;
5,820✔
560
    pGid->roleTimeMs = pVload->roleTimeMs;
5,820✔
561
    stateChanged = true;
5,820✔
562
  }
563
  return stateChanged;
795,019✔
564
}
565

566
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
32,614✔
567
  bool stateChanged = false;
32,614✔
568
  bool roleChanged = pObj->syncState != pMload->syncState ||
96,979✔
569
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
64,330!
570
                     pObj->roleTimeMs != pMload->roleTimeMs;
31,716✔
571
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
32,614✔
572
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
916!
573
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
574
          pObj->syncTerm, pMload->syncTerm);
575
    pObj->syncState = pMload->syncState;
916✔
576
    pObj->syncTerm = pMload->syncTerm;
916✔
577
    pObj->syncRestore = pMload->syncRestore;
916✔
578
    pObj->roleTimeMs = pMload->roleTimeMs;
916✔
579
    stateChanged = true;
916✔
580
  }
581
  return stateChanged;
32,614✔
582
}
583

584
extern char* tsMonFwUri;
585
extern char* tsMonSlowLogUri;
586
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
11,622✔
587
  SMnode    *pMnode = pReq->info.node;
11,622✔
588
  SStatisReq statisReq = {0};
11,622✔
589
  int32_t    code = -1;
11,622✔
590

591
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
11,622!
592

593
  if (tsMonitorLogProtocol) {
11,622!
594
    mInfo("process statis req,\n %s", statisReq.pCont);
×
595
  }
596

597
  if (statisReq.type == MONITOR_TYPE_COUNTER){
11,622✔
598
    monSendContent(statisReq.pCont, tsMonFwUri);
9,176✔
599
  }else if(statisReq.type == MONITOR_TYPE_SLOW_LOG){
2,446!
600
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
2,446✔
601
  }
602

603
  tFreeSStatisReq(&statisReq);
11,622✔
604
  return 0;
11,623✔
605
}
606

607
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
322✔
608
  int32_t       code = 0, lino = 0;
322✔
609
  SDnodeInfoReq infoReq = {0};
322✔
610
  int32_t       contLen = 0;
322✔
611
  void         *pReq = NULL;
322✔
612

613
  infoReq.dnodeId = pDnode->id;
322✔
614
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
322✔
615

616
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
322!
617
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
618
  }
619
  pReq = rpcMallocCont(contLen);
322✔
620
  if (pReq == NULL) {
322!
621
    TAOS_RETURN(terrno);
×
622
  }
623

624
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
322!
625
    code = contLen;
×
626
    goto _exit;
×
627
  }
628

629
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
322✔
630
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
322!
631
_exit:
322✔
632
  if (code < 0) {
322!
633
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
634
  }
635
  TAOS_RETURN(code);
322✔
636
}
637

638
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
322✔
639
  int32_t       code = 0, lino = 0;
322✔
640
  SMnode       *pMnode = pReq->info.node;
322✔
641
  SDnodeInfoReq infoReq = {0};
322✔
642
  SDnodeObj    *pDnode = NULL;
322✔
643
  STrans       *pTrans = NULL;
322✔
644
  SSdbRaw      *pCommitRaw = NULL;
322✔
645

646
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
322!
647

648
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
322✔
649
  if (pDnode == NULL) {
322!
650
    TAOS_CHECK_EXIT(terrno);
×
651
  }
652

653
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
322✔
654
  if (pTrans == NULL) {
322!
655
    TAOS_CHECK_EXIT(terrno);
×
656
  }
657

658
  pDnode->updateTime = taosGetTimestampMs();
322✔
659

660
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
322!
661
    TAOS_CHECK_EXIT(terrno);
×
662
  }
663
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
322!
664
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
665
    TAOS_CHECK_EXIT(code);
×
666
  }
667
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
322!
668
  pCommitRaw = NULL;
322✔
669

670
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
322!
671
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
672
    TAOS_CHECK_EXIT(code);
×
673
  }
674

675
_exit:
322✔
676
  mndReleaseDnode(pMnode, pDnode);
322✔
677
  if (code != 0) {
322!
678
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
679
  }
680
  mndTransDrop(pTrans);
322✔
681
  sdbFreeRaw(pCommitRaw);
322✔
682
  TAOS_RETURN(code);
322✔
683
}
684

685
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
54,445✔
686
  SMnode    *pMnode = pReq->info.node;
54,445✔
687
  SStatusReq statusReq = {0};
54,445✔
688
  SDnodeObj *pDnode = NULL;
54,445✔
689
  int32_t    code = -1;
54,445✔
690

691
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
54,445!
692

693
  int64_t clusterid = mndGetClusterId(pMnode);
54,445✔
694
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
54,445!
695
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
696
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
697
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
698
    goto _OVER;
×
699
  }
700

701
  if (statusReq.dnodeId == 0) {
54,445✔
702
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
1,030✔
703
    if (pDnode == NULL) {
1,030✔
704
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
116!
705
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
116✔
706
      if (terrno != 0) code = terrno;
116!
707
      goto _OVER;
116✔
708
    }
709
  } else {
710
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
53,415✔
711
    if (pDnode == NULL) {
53,415✔
712
      int32_t err = terrno;
310✔
713
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
310✔
714
      if (pDnode != NULL) {
310✔
715
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
3✔
716
        terrno = err;
3✔
717
        goto _OVER;
3✔
718
      }
719

720
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
307!
721
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
307✔
722
        terrno = err;
11✔
723
        goto _OVER;
11✔
724
      } else {
725
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
296✔
726
        if (pDnode == NULL) goto _OVER;
296!
727
      }
728
    }
729
  }
730

731
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
54,315✔
732

733
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
54,315✔
734
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
54,315✔
735
  int64_t curMs = taosGetTimestampMs();
54,315✔
736
  bool    online = mndIsDnodeOnline(pDnode, curMs);
54,315✔
737
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
54,315✔
738
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
54,315✔
739
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
54,315✔
740
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
54,315✔
741
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
54,315✔
742
  bool    analVerChanged = (analVer != statusReq.analVer);
54,315✔
743
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
53,090!
744
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
107,405!
745
  const STraceId *trace = &pReq->info.traceId;
54,315✔
746
  mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
54,315!
747
          pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
748

749
  if (reboot) {
54,315✔
750
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
1,235✔
751
  }
752

753
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
849,842✔
754
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
795,527✔
755

756
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
795,527✔
757
    if (pVgroup != NULL) {
795,527✔
758
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
795,037!
759
        pVgroup->cacheUsage = pVload->cacheUsage;
782,097✔
760
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
782,097✔
761
        pVgroup->numOfTables = pVload->numOfTables;
782,097✔
762
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
782,097✔
763
        pVgroup->totalStorage = pVload->totalStorage;
782,097✔
764
        pVgroup->compStorage = pVload->compStorage;
782,097✔
765
        pVgroup->pointsWritten = pVload->pointsWritten;
782,097✔
766
      }
767
      bool stateChanged = false;
795,037✔
768
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
811,873✔
769
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
811,855✔
770
        if (pGid->dnodeId == statusReq.dnodeId) {
811,855✔
771
          if (pVload->startTimeMs == 0) {
795,019!
772
            pVload->startTimeMs = statusReq.rebootTime;
×
773
          }
774
          if (pVload->roleTimeMs == 0) {
795,019!
775
            pVload->roleTimeMs = statusReq.rebootTime;
×
776
          }
777
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
795,019✔
778
          break;
795,019✔
779
        }
780
      }
781
      if (stateChanged) {
795,037✔
782
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,820✔
783
        if (pDb != NULL && pDb->stateTs != curMs) {
5,820✔
784
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,956!
785
                pDb->stateTs, curMs);
786
          pDb->stateTs = curMs;
3,956✔
787
        }
788
        mndReleaseDb(pMnode, pDb);
5,820✔
789
      }
790
    }
791

792
    mndReleaseVgroup(pMnode, pVgroup);
795,527✔
793
  }
794

795
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
54,315✔
796
  if (pObj != NULL) {
54,315✔
797
    if (statusReq.mload.roleTimeMs == 0) {
32,614✔
798
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
3✔
799
    }
800
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
32,614✔
801
    mndReleaseMnode(pMnode, pObj);
32,614✔
802
  }
803

804
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
54,315✔
805
  if (pQnode != NULL) {
54,315✔
806
    pQnode->load = statusReq.qload;
29,010✔
807
    mndReleaseQnode(pMnode, pQnode);
29,010✔
808
  }
809

810
  if (needCheck) {
54,315✔
811
    if (statusReq.sver != tsVersion) {
2,343!
812
      if (pDnode != NULL) {
×
813
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
814
      }
815
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
816
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
817
      goto _OVER;
×
818
    }
819

820
    if (statusReq.dnodeId == 0) {
2,343✔
821
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
914!
822
    } else {
823
      if (statusReq.clusterId != pMnode->clusterId) {
1,429!
824
        if (pDnode != NULL) {
×
825
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
826
        }
827
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
828
               pMnode->clusterId);
829
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
830
        goto _OVER;
×
831
      }
832
    }
833

834
    // Verify whether the cluster parameters are consistent when status change from offline to ready
835
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
2,343✔
836
    if (pDnode->offlineReason != 0) {
2,343!
837
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
838
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
839
      goto _OVER;
×
840
    }
841

842
    if (!online) {
2,343✔
843
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
1,225!
844
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
845
    } else {
846
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
1,118!
847
            statusReq.dnodeVer, dnodeVer, reboot);
848
    }
849

850
    pDnode->rebootTime = statusReq.rebootTime;
2,343✔
851
    pDnode->numOfCores = statusReq.numOfCores;
2,343✔
852
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,343✔
853
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,343✔
854
    pDnode->memAvail = statusReq.memAvail;
2,343✔
855
    pDnode->memTotal = statusReq.memTotal;
2,343✔
856
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,343✔
857
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,343✔
858
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,343✔
859
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
322✔
860
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
322!
861
        goto _OVER;
×
862
      }
863
    }
864

865
    SStatusRsp statusRsp = {0};
2,343✔
866
    statusRsp.statusSeq++;
2,343✔
867
    statusRsp.analVer = analVer;
2,343✔
868
    statusRsp.dnodeVer = dnodeVer;
2,343✔
869
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,343✔
870
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,343✔
871
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,343✔
872
    if (statusRsp.pDnodeEps == NULL) {
2,343!
873
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
874
      goto _OVER;
×
875
    }
876

877
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,343✔
878
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,343✔
879

880
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,343✔
881
    void   *pHead = rpcMallocCont(contLen);
2,343✔
882
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,343✔
883
    taosArrayDestroy(statusRsp.pDnodeEps);
2,343✔
884
    if (contLen < 0) {
2,343!
885
      code = contLen;
×
886
      goto _OVER;
×
887
    }
888

889
    pReq->info.rspLen = contLen;
2,343✔
890
    pReq->info.rsp = pHead;
2,343✔
891
  }
892

893
  pDnode->accessTimes++;
54,315✔
894
  pDnode->lastAccessTime = curMs;
54,315✔
895
  code = 0;
54,315✔
896

897
_OVER:
54,445✔
898
  mndReleaseDnode(pMnode, pDnode);
54,445✔
899
  taosArrayDestroy(statusReq.pVloads);
54,445✔
900
  return mndUpdClusterInfo(pReq);
54,445✔
901
}
902

903
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
904
  SMnode    *pMnode = pReq->info.node;
×
905
  SNotifyReq notifyReq = {0};
×
906
  int32_t    code = 0;
×
907

908
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
909
    terrno = code;
×
910
    goto _OVER;
×
911
  }
912

913
  int64_t clusterid = mndGetClusterId(pMnode);
×
914
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
915
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
916
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
917
          notifyReq.clusterId, clusterid, tstrerror(code));
918
    goto _OVER;
×
919
  }
920

921
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
922
  for (int32_t v = 0; v < nVgroup; ++v) {
×
923
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
924

925
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
926
    if (pVgroup != NULL) {
×
927
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
928
      mndReleaseVgroup(pMnode, pVgroup);
×
929
    }
930
  }
931
  code = mndUpdClusterInfo(pReq);
×
932
_OVER:
×
933
  tFreeSNotifyReq(&notifyReq);
×
934
  return code;
×
935
}
936

937
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
324✔
938
  int32_t  code = -1;
324✔
939
  SSdbRaw *pRaw = NULL;
324✔
940
  STrans  *pTrans = NULL;
324✔
941

942
  SDnodeObj dnodeObj = {0};
324✔
943
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
324✔
944
  dnodeObj.createdTime = taosGetTimestampMs();
324✔
945
  dnodeObj.updateTime = dnodeObj.createdTime;
324✔
946
  dnodeObj.port = pCreate->port;
324✔
947
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
324✔
948
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
324✔
949

950
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
324✔
951
  if (pTrans == NULL) {
324!
952
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
953
    if (terrno != 0) code = terrno;
×
954
    goto _OVER;
×
955
  }
956
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
324!
957
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
324!
958

959
  pRaw = mndDnodeActionEncode(&dnodeObj);
324✔
960
  if (pRaw == NULL) {
324!
961
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
962
    if (terrno != 0) code = terrno;
×
963
    goto _OVER;
×
964
  }
965
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
324!
966
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
324!
967
  pRaw = NULL;
324✔
968

969
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
324!
970
  code = 0;
324✔
971

972
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
324✔
973
                                   1);  // TODO: check the return value
974
_OVER:
324✔
975
  mndTransDrop(pTrans);
324✔
976
  sdbFreeRaw(pRaw);
324✔
977
  return code;
324✔
978
}
979

980
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
19✔
981
  SMnode       *pMnode = pReq->info.node;
19✔
982
  SSdb         *pSdb = pMnode->pSdb;
19✔
983
  SDnodeObj    *pObj = NULL;
19✔
984
  void         *pIter = NULL;
19✔
985
  SDnodeListRsp rsp = {0};
19✔
986
  int32_t       code = -1;
19✔
987

988
  rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
19✔
989
  if (NULL == rsp.dnodeList) {
19!
990
    mError("failed to alloc epSet while process dnode list req");
×
991
    code = terrno;
×
992
    goto _OVER;
×
993
  }
994

995
  while (1) {
20✔
996
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
39✔
997
    if (pIter == NULL) break;
39✔
998

999
    SEpSet epSet = {0};
20✔
1000
    epSet.numOfEps = 1;
20✔
1001
    tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
20✔
1002
    epSet.eps[0].port = pObj->port;
20✔
1003

1004
    if (taosArrayPush(rsp.dnodeList, &epSet) == NULL) {
40!
1005
      if (terrno != 0) code = terrno;
×
1006
      sdbRelease(pSdb, pObj);
×
1007
      sdbCancelFetch(pSdb, pIter);
×
1008
      goto _OVER;
×
1009
    }
1010

1011
    sdbRelease(pSdb, pObj);
20✔
1012
  }
1013

1014
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
19✔
1015
  void   *pRsp = rpcMallocCont(rspLen);
19✔
1016
  if (pRsp == NULL) {
19!
1017
    code = terrno;
×
1018
    goto _OVER;
×
1019
  }
1020

1021
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
19!
1022
    code = rspLen;
×
1023
    goto _OVER;
×
1024
  }
1025

1026
  pReq->info.rspLen = rspLen;
19✔
1027
  pReq->info.rsp = pRsp;
19✔
1028
  code = 0;
19✔
1029

1030
_OVER:
19✔
1031

1032
  if (code != 0) {
19!
1033
    mError("failed to get dnode list since %s", tstrerror(code));
×
1034
  }
1035

1036
  tFreeSDnodeListRsp(&rsp);
19✔
1037

1038
  TAOS_RETURN(code);
19✔
1039
}
1040

1041
static void getSlowLogScopeString(int32_t scope, char* result){
15✔
1042
  if(scope == SLOW_LOG_TYPE_NULL) {
15!
1043
    (void)strcat(result, "NONE");
×
1044
    return;
×
1045
  }
1046
  while(scope > 0){
30✔
1047
    if(scope & SLOW_LOG_TYPE_QUERY) {
15!
1048
      (void)strcat(result, "QUERY");
15✔
1049
      scope &= ~SLOW_LOG_TYPE_QUERY;
15✔
1050
    } else if(scope & SLOW_LOG_TYPE_INSERT) {
×
1051
      (void)strcat(result, "INSERT");
×
1052
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1053
    } else if(scope & SLOW_LOG_TYPE_OTHERS) {
×
1054
      (void)strcat(result, "OTHERS");
×
1055
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1056
    } else{
1057
      (void)printf("invalid slow log scope:%d", scope);
×
1058
      return;
×
1059
    }
1060

1061
    if(scope > 0) {
15!
1062
      (void)strcat(result, "|");
×
1063
    }
1064
  }
1065
}
1066

1067
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
8✔
1068
  SShowVariablesRsp rsp = {0};
8✔
1069
  int32_t           code = -1;
8✔
1070

1071
  if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_SHOW_VARIABLES) != 0) {
8!
1072
    goto _OVER;
×
1073
  }
1074

1075
  rsp.variables = taosArrayInit(16, sizeof(SVariablesInfo));
8✔
1076
  if (NULL == rsp.variables) {
8!
1077
    mError("failed to alloc SVariablesInfo array while process show variables req");
×
1078
    code = terrno;
×
1079
    goto _OVER;
×
1080
  }
1081

1082
  SVariablesInfo info = {0};
8✔
1083

1084
  (void)strcpy(info.name, "statusInterval");
8✔
1085
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
8✔
1086
  (void)strcpy(info.scope, "server");
8✔
1087
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1088
    code = terrno;
×
1089
    goto _OVER;
×
1090
  }
1091

1092
  (void)strcpy(info.name, "timezone");
8✔
1093
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
8✔
1094
  (void)strcpy(info.scope, "both");
8✔
1095
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1096
    code = terrno;
×
1097
    goto _OVER;
×
1098
  }
1099

1100
  (void)strcpy(info.name, "locale");
8✔
1101
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
8✔
1102
  (void)strcpy(info.scope, "both");
8✔
1103
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1104
    code = terrno;
×
1105
    goto _OVER;
×
1106
  }
1107

1108
  (void)strcpy(info.name, "charset");
8✔
1109
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
8✔
1110
  (void)strcpy(info.scope, "both");
8✔
1111
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1112
    code = terrno;
×
1113
    goto _OVER;
×
1114
  }
1115

1116
  (void)strcpy(info.name, "monitor");
8✔
1117
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
8✔
1118
  (void)strcpy(info.scope, "server");
8✔
1119
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1120
    code = terrno;
×
1121
    goto _OVER;
×
1122
  }
1123

1124
  (void)strcpy(info.name, "monitorInterval");
8✔
1125
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
8✔
1126
  (void)strcpy(info.scope, "server");
8✔
1127
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1128
    code = terrno;
×
1129
    goto _OVER;
×
1130
  }
1131

1132
  (void)strcpy(info.name, "slowLogThreshold");
8✔
1133
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
8✔
1134
  (void)strcpy(info.scope, "server");
8✔
1135
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1136
    code = terrno;
×
1137
    goto _OVER;
×
1138
  }
1139

1140
  (void)strcpy(info.name, "slowLogMaxLen");
8✔
1141
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
8✔
1142
  (void)strcpy(info.scope, "server");
8✔
1143
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1144
    code = terrno;
×
1145
    goto _OVER;
×
1146
  }
1147

1148
  char scopeStr[64] = {0};
8✔
1149
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
8✔
1150
  (void)strcpy(info.name, "slowLogScope");
8✔
1151
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
8✔
1152
  (void)strcpy(info.scope, "server");
8✔
1153
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1154
    code = terrno;
×
1155
    goto _OVER;
×
1156
  }
1157

1158
  int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp);
8✔
1159
  void   *pRsp = rpcMallocCont(rspLen);
8✔
1160
  if (pRsp == NULL) {
8!
1161
    code = terrno;
×
1162
    goto _OVER;
×
1163
  }
1164

1165
  if ((rspLen = tSerializeSShowVariablesRsp(pRsp, rspLen, &rsp)) <= 0) {
8!
1166
    code = rspLen;
×
1167
    goto _OVER;
×
1168
  }
1169

1170
  pReq->info.rspLen = rspLen;
8✔
1171
  pReq->info.rsp = pRsp;
8✔
1172
  code = 0;
8✔
1173

1174
_OVER:
8✔
1175

1176
  if (code != 0) {
8!
1177
    mError("failed to get show variables info since %s", tstrerror(code));
×
1178
  }
1179

1180
  tFreeSShowVariablesRsp(&rsp);
8✔
1181
  TAOS_RETURN(code);
8✔
1182
}
1183

1184
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
325✔
1185
  SMnode         *pMnode = pReq->info.node;
325✔
1186
  int32_t         code = -1;
325✔
1187
  SDnodeObj      *pDnode = NULL;
325✔
1188
  SCreateDnodeReq createReq = {0};
325✔
1189

1190
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
325!
1191
    goto _OVER;
×
1192
  }
1193

1194
  TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
325!
1195

1196
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
325!
1197
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
325✔
1198

1199
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
324!
1200
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1201
    goto _OVER;
×
1202
  }
1203

1204
  char ep[TSDB_EP_LEN];
1205
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
324✔
1206
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
324✔
1207
  if (pDnode != NULL) {
324!
1208
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1209
    goto _OVER;
×
1210
  }
1211

1212
  code = mndCreateDnode(pMnode, pReq, &createReq);
324✔
1213
  if (code == 0) {
324!
1214
    code = TSDB_CODE_ACTION_IN_PROGRESS;
324✔
1215
    tsGrantHBInterval = 5;
324✔
1216
  }
1217

1218
  char obj[200] = {0};
324✔
1219
  (void)sprintf(obj, "%s:%d", createReq.fqdn, createReq.port);
324✔
1220

1221
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
324✔
1222

1223
_OVER:
325✔
1224
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
325!
1225
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
1!
1226
  }
1227

1228
  mndReleaseDnode(pMnode, pDnode);
325✔
1229
  tFreeSCreateDnodeReq(&createReq);
325✔
1230
  TAOS_RETURN(code);
325✔
1231
}
1232

1233
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1234

1235
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
×
1236

1237
#ifndef TD_ENTERPRISE
1238
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1239
#endif
1240

1241
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
22✔
1242
                            SSnodeObj *pSObj, int32_t numOfVnodes, bool force, bool unsafe) {
1243
  int32_t  code = -1;
22✔
1244
  SSdbRaw *pRaw = NULL;
22✔
1245
  STrans  *pTrans = NULL;
22✔
1246

1247
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
22✔
1248
  if (pTrans == NULL) {
22!
1249
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1250
    if (terrno != 0) code = terrno;
×
1251
    goto _OVER;
×
1252
  }
1253
  mndTransSetSerial(pTrans);
22✔
1254
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
22!
1255
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
22!
1256

1257
  pRaw = mndDnodeActionEncode(pDnode);
22✔
1258
  if (pRaw == NULL) {
22!
1259
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1260
    if (terrno != 0) code = terrno;
×
1261
    goto _OVER;
×
1262
  }
1263
  TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRaw), NULL, _OVER);
22!
1264
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), NULL, _OVER);
22!
1265
  pRaw = NULL;
22✔
1266

1267
  pRaw = mndDnodeActionEncode(pDnode);
22✔
1268
  if (pRaw == NULL) {
22!
1269
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1270
    if (terrno != 0) code = terrno;
×
1271
    goto _OVER;
×
1272
  }
1273
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
22!
1274
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), NULL, _OVER);
22!
1275
  pRaw = NULL;
22✔
1276

1277
  if (pMObj != NULL) {
22✔
1278
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
4!
1279
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), NULL, _OVER);
4!
1280
  }
1281

1282
  if (pQObj != NULL) {
22✔
1283
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1284
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), NULL, _OVER);
3!
1285
  }
1286

1287
  if (pSObj != NULL) {
22✔
1288
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1289
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), NULL, _OVER);
3!
1290
  }
1291

1292
  if (numOfVnodes > 0) {
22✔
1293
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
16!
1294
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
16✔
1295
  }
1296

1297
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
20!
1298

1299
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP,
20✔
1300
                                   1);  // TODO: check the return value
1301
  code = 0;
20✔
1302

1303
_OVER:
22✔
1304
  mndTransDrop(pTrans);
22✔
1305
  sdbFreeRaw(pRaw);
22✔
1306
  TAOS_RETURN(code);
22✔
1307
}
1308

1309
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
24✔
1310
  bool       isEmpty = false;
24✔
1311
  SMnodeObj *pMObj = NULL;
24✔
1312
  SQnodeObj *pQObj = NULL;
24✔
1313
  SSnodeObj *pSObj = NULL;
24✔
1314

1315
  pQObj = mndAcquireQnode(pMnode, dnodeId);
24✔
1316
  if (pQObj) goto _OVER;
24✔
1317

1318
  pSObj = mndAcquireSnode(pMnode, dnodeId);
20✔
1319
  if (pSObj) goto _OVER;
20!
1320

1321
  pMObj = mndAcquireMnode(pMnode, dnodeId);
20✔
1322
  if (pMObj) goto _OVER;
20✔
1323

1324
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
18✔
1325
  if (numOfVnodes > 0) goto _OVER;
18✔
1326

1327
  isEmpty = true;
3✔
1328
_OVER:
24✔
1329
  mndReleaseMnode(pMnode, pMObj);
24✔
1330
  mndReleaseQnode(pMnode, pQObj);
24✔
1331
  mndReleaseSnode(pMnode, pSObj);
24✔
1332
  return isEmpty;
24✔
1333
}
1334

1335
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
30✔
1336
  SMnode       *pMnode = pReq->info.node;
30✔
1337
  int32_t       code = -1;
30✔
1338
  SDnodeObj    *pDnode = NULL;
30✔
1339
  SMnodeObj    *pMObj = NULL;
30✔
1340
  SQnodeObj    *pQObj = NULL;
30✔
1341
  SSnodeObj    *pSObj = NULL;
30✔
1342
  SDropDnodeReq dropReq = {0};
30✔
1343

1344
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
30!
1345

1346
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
30!
1347
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1348
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
30✔
1349

1350
  bool force = dropReq.force;
29✔
1351
  if (dropReq.unsafe) {
29✔
1352
    force = true;
1✔
1353
  }
1354

1355
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
29✔
1356
  if (pDnode == NULL) {
29✔
1357
    int32_t err = terrno;
1✔
1358
    char    ep[TSDB_EP_LEN + 1] = {0};
1✔
1359
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
1✔
1360
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
1✔
1361
    if (pDnode == NULL) {
1!
1362
      code = err;
1✔
1363
      goto _OVER;
1✔
1364
    }
1365
  }
1366

1367
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
28✔
1368
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
28✔
1369
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
28✔
1370
  if (pMObj != NULL) {
28✔
1371
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
9✔
1372
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
2✔
1373
      goto _OVER;
2✔
1374
    }
1375
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
7✔
1376
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
2✔
1377
      goto _OVER;
2✔
1378
    }
1379
  }
1380

1381
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
24✔
1382
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
24✔
1383

1384
  if (isonline && force) {
24!
1385
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1386
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1387
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1388
    goto _OVER;
×
1389
  }
1390

1391
  bool isEmpty = mndIsEmptyDnode(pMnode, pDnode->id);
24✔
1392
  if (!isonline && !force && !isEmpty) {
24!
1393
    code = TSDB_CODE_DNODE_OFFLINE;
2✔
1394
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
2!
1395
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1396
    goto _OVER;
2✔
1397
  }
1398

1399
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
22✔
1400
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
22✔
1401

1402
  char obj1[30] = {0};
22✔
1403
  (void)sprintf(obj1, "%d", dropReq.dnodeId);
22✔
1404

1405
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
22✔
1406

1407
_OVER:
30✔
1408
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
30!
1409
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
10!
1410
  }
1411

1412
  mndReleaseDnode(pMnode, pDnode);
30✔
1413
  mndReleaseMnode(pMnode, pMObj);
30✔
1414
  mndReleaseQnode(pMnode, pQObj);
30✔
1415
  mndReleaseSnode(pMnode, pSObj);
30✔
1416
  tFreeSDropDnodeReq(&dropReq);
30✔
1417
  TAOS_RETURN(code);
30✔
1418
}
1419

1420
static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
955✔
1421
  int32_t code = 0;
955✔
1422
  char *p = pMCfgReq->config;
955✔
1423
  while (*p) {
12,956✔
1424
    if (*p == ' ') {
12,030✔
1425
      break;
29✔
1426
    }
1427
    p++;
12,001✔
1428
  }
1429

1430
  size_t optLen = p - pMCfgReq->config;
955✔
1431
  (void)strncpy(pDCfgReq->config, pMCfgReq->config, optLen);
955✔
1432
  pDCfgReq->config[optLen] = 0;
955✔
1433

1434
  if (' ' == pMCfgReq->config[optLen]) {
955✔
1435
    // 'key value'
1436
    if (strlen(pMCfgReq->value) != 0) goto _err;
29!
1437
    (void)strcpy(pDCfgReq->value, p + 1);
29✔
1438
  } else {
1439
    // 'key' 'value'
1440
    if (strlen(pMCfgReq->value) == 0) goto _err;
926✔
1441
    (void)strcpy(pDCfgReq->value, pMCfgReq->value);
925✔
1442
  }
1443

1444
  TAOS_RETURN(code);
954✔
1445

1446
_err:
1✔
1447
  mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
1!
1448
  code = TSDB_CODE_INVALID_CFG;
1✔
1449
  TAOS_RETURN(code);
1✔
1450
}
1451

1452
static int32_t mndSendCfgDnodeReq(SMnode *pMnode, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
943✔
1453
  int32_t code = -1;
943✔
1454
  SSdb   *pSdb = pMnode->pSdb;
943✔
1455
  void   *pIter = NULL;
943✔
1456
  while (1) {
947✔
1457
    SDnodeObj *pDnode = NULL;
1,890✔
1458
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
1,890✔
1459
    if (pIter == NULL) break;
1,890✔
1460

1461
    if (pDnode->id == dnodeId || dnodeId == -1 || dnodeId == 0) {
947!
1462
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
939✔
1463
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
939✔
1464
      void   *pBuf = rpcMallocCont(bufLen);
939✔
1465

1466
      if (pBuf != NULL) {
939!
1467
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
939!
1468
          code = bufLen;
×
1469
          return code;
×
1470
        }
1471
        mInfo("dnode:%d, send config req to dnode, config:%s value:%s", dnodeId, pDcfgReq->config, pDcfgReq->value);
939!
1472
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
939✔
1473
        code = tmsgSendReq(&epSet, &rpcMsg);
939✔
1474
      }
1475
    }
1476

1477
    sdbRelease(pSdb, pDnode);
947✔
1478
  }
1479

1480
  if (code == -1) {
943✔
1481
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
4✔
1482
  }
1483
  TAOS_RETURN(code);
943✔
1484
}
1485

1486
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
963✔
1487
  int32_t       code = 0;
963✔
1488
  SMnode       *pMnode = pReq->info.node;
963✔
1489
  SMCfgDnodeReq cfgReq = {0};
963✔
1490
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
963!
1491
  int8_t updateIpWhiteList = 0;
963✔
1492
  mInfo("dnode:%d, start to config, option:%s, value:%s", cfgReq.dnodeId, cfgReq.config, cfgReq.value);
963!
1493
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
963✔
1494
    tFreeSMCfgDnodeReq(&cfgReq);
7✔
1495
    TAOS_RETURN(code);
7✔
1496
  }
1497

1498
  SDCfgDnodeReq dcfgReq = {0};
956✔
1499
  if (strcasecmp(cfgReq.config, "resetlog") == 0) {
956✔
1500
    (void)strcpy(dcfgReq.config, "resetlog");
1✔
1501
#ifdef TD_ENTERPRISE
1502
  } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
955!
1503
    int32_t optLen = strlen("s3blocksize");
×
1504
    int32_t flag = -1;
×
1505
    int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
×
1506
    if (code < 0) return code;
×
1507

1508
    if (flag > 1024 * 1024 || (flag > -1 && flag < 1024) || flag < -1) {
×
1509
      mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [1024, 1024 * 1024]",
×
1510
             cfgReq.dnodeId, flag);
1511
      code = TSDB_CODE_INVALID_CFG;
×
1512
      tFreeSMCfgDnodeReq(&cfgReq);
×
1513
      TAOS_RETURN(code);
×
1514
    }
1515

1516
    strcpy(dcfgReq.config, "s3blocksize");
×
1517
    snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
×
1518
#endif
1519
  } else {
1520
    TAOS_CHECK_GOTO (mndMCfg2DCfg(&cfgReq, &dcfgReq), NULL, _err_out);
955✔
1521
    if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) {
954!
1522
      mError("dnode:%d, failed to config since config is too long", cfgReq.dnodeId);
×
1523
      code = TSDB_CODE_INVALID_CFG;
×
1524
      goto _err_out;
×
1525
    }
1526
    if (strncasecmp(dcfgReq.config, "enableWhiteList", strlen("enableWhiteList")) == 0) {
954!
1527
      updateIpWhiteList = 1;
×
1528
    }
1529

1530
    TAOS_CHECK_GOTO(cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true), NULL, _err_out);
954✔
1531
  }
1532

1533
  {  // audit
1534
    char obj[50] = {0};
943✔
1535
    (void)sprintf(obj, "%d", cfgReq.dnodeId);
943✔
1536

1537
    auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen);
943✔
1538
  }
1539

1540
  tFreeSMCfgDnodeReq(&cfgReq);
943✔
1541

1542
  code = mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
943✔
1543

1544
  // dont care suss or succ;
1545
  if (updateIpWhiteList) mndRefreshUserIpWhiteList(pMnode);
943!
1546
  TAOS_RETURN(code);
943✔
1547

1548
_err_out:
13✔
1549
  tFreeSMCfgDnodeReq(&cfgReq);
13✔
1550
  TAOS_RETURN(code);
13✔
1551
}
1552

1553
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
939✔
1554
  mInfo("config rsp from dnode");
939!
1555
  return 0;
939✔
1556
}
1557

1558
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
×
1559
  int32_t code = 0;
×
1560
  SMnode *pMnode = pReq->info.node;
×
1561
  SSdb   *pSdb = pMnode->pSdb;
×
1562
  void   *pIter = NULL;
×
1563
  int8_t  encrypting = 0;
×
1564

1565
  const STraceId *trace = &pReq->info.traceId;
×
1566

1567
  int32_t klen = strlen(pDcfgReq->value);
×
1568
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
1569
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1570
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1571
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1572
    goto _exit;
×
1573
  }
1574

1575
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
×
1576
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1577
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1578
    goto _exit;
×
1579
  }
1580

1581
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
×
1582
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1583
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1584
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1585
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1586
    goto _exit;
×
1587
  }
1588

1589
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
×
1590
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
×
1591
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
×
1592

1593
  while (1) {
×
1594
    SDnodeObj *pDnode = NULL;
×
1595
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
1596
    if (pIter == NULL) break;
×
1597
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
1598
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1599
             offlineReason[pDnode->offlineReason]);
1600
      sdbRelease(pSdb, pDnode);
×
1601
      continue;
×
1602
    }
1603

1604
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
×
1605
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
1606
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
×
1607
      void   *pBuf = rpcMallocCont(bufLen);
×
1608

1609
      if (pBuf != NULL) {
×
1610
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
×
1611
          code = bufLen;
×
1612
          sdbRelease(pSdb, pDnode);
×
1613
          goto _exit;
×
1614
        }
1615
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
1616
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
×
1617
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
×
1618
        }
1619
      }
1620
    }
1621

1622
    sdbRelease(pSdb, pDnode);
×
1623
  }
1624

1625
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
×
1626
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1627
  }
1628

1629
_exit:
×
1630
  if (code != 0) {
×
1631
    if (terrno == 0) terrno = code;
×
1632
  }
1633
  return code;
×
1634
}
1635

1636
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
×
1637
  int32_t code = 0;
×
1638

1639
#ifdef TD_ENTERPRISE
1640
  SMnode       *pMnode = pReq->info.node;
×
1641
  SMCfgDnodeReq cfgReq = {0};
×
1642
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
×
1643

1644
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
×
1645
    tFreeSMCfgDnodeReq(&cfgReq);
×
1646
    TAOS_RETURN(code);
×
1647
  }
1648
  const STraceId *trace = &pReq->info.traceId;
×
1649
  SDCfgDnodeReq   dcfgReq = {0};
×
1650
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
×
1651
    strcpy(dcfgReq.config, cfgReq.config);
×
1652
    strcpy(dcfgReq.value, cfgReq.value);
×
1653
    tFreeSMCfgDnodeReq(&cfgReq);
×
1654
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
×
1655
  } else {
1656
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1657
    tFreeSMCfgDnodeReq(&cfgReq);
×
1658
    TAOS_RETURN(code);
×
1659
  }
1660

1661
#else
1662
  TAOS_RETURN(code);
1663
#endif
1664
}
1665

1666
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
×
1667
  SMnode *pMnode = pRsp->info.node;
×
1668
  int16_t nSuccess = 0;
×
1669
  int16_t nFailed = 0;
×
1670

1671
  if (0 == pRsp->code) {
×
1672
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
×
1673
  } else {
1674
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1675
  }
1676

1677
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
×
1678
  bool    finished = nSuccess + nFailed >= nReq;
×
1679

1680
  if (finished) {
×
1681
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1682
  }
1683

1684
  const STraceId *trace = &pRsp->info.traceId;
×
1685
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
×
1686
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1687

1688
  return 0;
×
1689
}
1690

1691
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7✔
1692
  SMnode *pMnode = pReq->info.node;
7✔
1693
  int32_t totalRows = 0;
7✔
1694
  int32_t numOfRows = 0;
7✔
1695
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
7✔
1696
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
7✔
1697
  char   *pWrite = NULL;
7✔
1698
  int32_t cols = 0;
7✔
1699
  int32_t code = 0;
7✔
1700
  int32_t lino = 0;
7✔
1701

1702
  cfgOpts[totalRows] = "statusInterval";
7✔
1703
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
7✔
1704
  totalRows++;
7✔
1705

1706
  cfgOpts[totalRows] = "timezone";
7✔
1707
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
7✔
1708
  totalRows++;
7✔
1709

1710
  cfgOpts[totalRows] = "locale";
7✔
1711
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
7✔
1712
  totalRows++;
7✔
1713

1714
  cfgOpts[totalRows] = "charset";
7✔
1715
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
7✔
1716
  totalRows++;
7✔
1717

1718
  cfgOpts[totalRows] = "monitor";
7✔
1719
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
7✔
1720
  totalRows++;
7✔
1721

1722
  cfgOpts[totalRows] = "monitorInterval";
7✔
1723
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
7✔
1724
  totalRows++;
7✔
1725

1726
  cfgOpts[totalRows] = "slowLogThreshold";
7✔
1727
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
7✔
1728
  totalRows++;
7✔
1729

1730
  cfgOpts[totalRows] = "slowLogMaxLen";
7✔
1731
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
7✔
1732
  totalRows++;
7✔
1733

1734
  char scopeStr[64] = {0};
7✔
1735
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
7✔
1736
  cfgOpts[totalRows] = "slowLogScope";
7✔
1737
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
7✔
1738
  totalRows++;
7✔
1739

1740
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1741
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1742

1743
  for (int32_t i = 0; i < totalRows; i++) {
70✔
1744
    cols = 0;
63✔
1745

1746
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
63✔
1747
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1748
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
63!
1749

1750
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
63✔
1751
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1752
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
63!
1753

1754
    numOfRows++;
63✔
1755
  }
1756

1757
_OVER:
7✔
1758
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
7!
1759
  pShow->numOfRows += numOfRows;
7✔
1760
  return numOfRows;
7✔
1761
}
1762

1763
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1764

1765
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
582✔
1766
  SMnode    *pMnode = pReq->info.node;
582✔
1767
  SSdb      *pSdb = pMnode->pSdb;
582✔
1768
  int32_t    numOfRows = 0;
582✔
1769
  int32_t    cols = 0;
582✔
1770
  ESdbStatus objStatus = 0;
582✔
1771
  SDnodeObj *pDnode = NULL;
582✔
1772
  int64_t    curMs = taosGetTimestampMs();
582✔
1773
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1774
  int32_t    code = 0;
582✔
1775
  int32_t    lino = 0;
582✔
1776

1777
  while (numOfRows < rows) {
2,649!
1778
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
2,649✔
1779
    if (pShow->pIter == NULL) break;
2,649✔
1780
    bool online = mndIsDnodeOnline(pDnode, curMs);
2,067✔
1781

1782
    cols = 0;
2,067✔
1783

1784
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,067✔
1785
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
2,067!
1786

1787
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
2,067✔
1788

1789
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,067✔
1790
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,067!
1791

1792
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,067✔
1793
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
2,067✔
1794
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
2,067!
1795

1796
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,067✔
1797
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
2,067!
1798
                        &lino, _OVER);
1799

1800
    const char *status = "ready";
2,067✔
1801
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
2,067!
1802
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
2,067!
1803
    if (!online) {
2,067✔
1804
      if (objStatus == SDB_STATUS_CREATING)
301!
1805
        status = "creating*";
×
1806
      else if (objStatus == SDB_STATUS_DROPPING)
301!
1807
        status = "dropping*";
×
1808
      else
1809
        status = "offline";
301✔
1810
    }
1811

1812
    STR_TO_VARSTR(buf, status);
2,067✔
1813
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,067✔
1814
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,067!
1815

1816
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,067✔
1817
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
2,067!
1818
                        _OVER);
1819

1820
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,067✔
1821
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
2,067!
1822
                        _OVER);
1823

1824
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
2,067✔
1825
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
2,067✔
1826

1827
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,067✔
1828
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
2,067!
1829
    taosMemoryFreeClear(b);
2,067!
1830

1831
#ifdef TD_ENTERPRISE
1832
    STR_TO_VARSTR(buf, pDnode->machineId);
2,067✔
1833
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,067✔
1834
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,067!
1835
#endif
1836

1837
    numOfRows++;
2,067✔
1838
    sdbRelease(pSdb, pDnode);
2,067✔
1839
  }
1840

1841
_OVER:
×
1842
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
582!
1843

1844
  pShow->numOfRows += numOfRows;
582✔
1845
  return numOfRows;
582✔
1846
}
1847

1848
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1849
  SSdb *pSdb = pMnode->pSdb;
×
1850
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1851
}
×
1852

1853
// get int32_t value from 'SMCfgDnodeReq'
1854
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32_t *pOutValue) {
×
1855
  int32_t code = 0;
×
1856
  if (' ' != pMCfgReq->config[optLen] && 0 != pMCfgReq->config[optLen]) {
×
1857
    goto _err;
×
1858
  }
1859

1860
  if (' ' == pMCfgReq->config[optLen]) {
×
1861
    // 'key value'
1862
    if (strlen(pMCfgReq->value) != 0) goto _err;
×
1863
    *pOutValue = atoi(pMCfgReq->config + optLen + 1);
×
1864
  } else {
1865
    // 'key' 'value'
1866
    if (strlen(pMCfgReq->value) == 0) goto _err;
×
1867
    *pOutValue = atoi(pMCfgReq->value);
×
1868
  }
1869

1870
  TAOS_RETURN(code);
×
1871

1872
_err:
×
1873
  mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
×
1874
  code = TSDB_CODE_INVALID_CFG;
×
1875
  TAOS_RETURN(code);
×
1876
}
1877

1878
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
1,209✔
1879
  SDnodeObj *pObj = NULL;
1,209✔
1880
  void      *pIter = NULL;
1,209✔
1881
  SSdb      *pSdb = pMnode->pSdb;
1,209✔
1882
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
1,209✔
1883
  while (1) {
1,254✔
1884
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
2,463✔
1885
    if (pIter == NULL) break;
2,463✔
1886

1887
    char *fqdn = taosStrdup(pObj->fqdn);
1,254✔
1888
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
1,254!
1889
      mError("failed to fqdn into array, but continue at this time");
×
1890
    }
1891
    sdbRelease(pSdb, pObj);
1,254✔
1892
  }
1893
  return fqdns;
1,209✔
1894
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc