• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3545

02 Dec 2024 06:22AM UTC coverage: 60.839% (-0.04%) from 60.88%
#3545

push

travis-ci

web-flow
Merge pull request #28961 from taosdata/fix/refactor-vnode-management-open-vnode

fix/refactor-vnode-management-open-vnode

120592 of 253473 branches covered (47.58%)

Branch coverage included in aggregate %.

102 of 145 new or added lines in 3 files covered. (70.34%)

477 existing lines in 108 files now uncovered.

201840 of 276506 relevant lines covered (73.0%)

19392204.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.16
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndCluster.h"
21
#include "mndDb.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "taos_monitor.h"
31
#include "tjson.h"
32
#include "tmisce.h"
33
#include "tunit.h"
34

35
#define TSDB_DNODE_VER_NUMBER   2
36
#define TSDB_DNODE_RESERVE_SIZE 40
37

38
static const char *offlineReason[] = {
39
    "",
40
    "status msg timeout",
41
    "status not received",
42
    "version not match",
43
    "dnodeId not match",
44
    "clusterId not match",
45
    "statusInterval not match",
46
    "timezone not match",
47
    "locale not match",
48
    "charset not match",
49
    "ttlChangeOnWrite not match",
50
    "enableWhiteList not match",
51
    "encryptionKey not match",
52
    "monitor not match",
53
    "monitor switch not match",
54
    "monitor interval not match",
55
    "monitor slow log threshold not match",
56
    "monitor slow log sql max len not match",
57
    "monitor slow log scope not match",
58
    "unknown",
59
};
60

61
enum {
62
  DND_ACTIVE_CODE,
63
  DND_CONN_ACTIVE_CODE,
64
};
65

66
enum {
67
  DND_CREATE,
68
  DND_ADD,
69
  DND_DROP,
70
};
71

72
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
73
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
74
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
75
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
76
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
77
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
78
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
79
static int32_t  mndProcessShowVariablesReq(SRpcMsg *pReq);
80

81
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
82
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
91
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
92
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
93

94
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
96
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
98

99
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue);
100

101
#ifdef _GRANT
102
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
103
#else
104
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
105
#endif
106

107
int32_t mndInitDnode(SMnode *pMnode) {
2,012✔
108
  SSdbTable table = {
2,012✔
109
      .sdbType = SDB_DNODE,
110
      .keyType = SDB_KEY_INT32,
111
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
112
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
113
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
114
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
115
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
116
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
117
  };
118

119
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
2,012✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
2,012✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
2,012✔
122
  mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
2,012✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
2,012✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
2,012✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
2,012✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
2,012✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
2,012✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
2,012✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
2,012✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
2,012✔
131
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
2,012✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
2,012✔
133

134
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
2,012✔
135
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
2,012✔
136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
2,012✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
2,012✔
138

139
  return sdbSetTable(pMnode->pSdb, table);
2,012✔
140
}
141

142
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
143
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
144
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
145
void          mndCleanupDnode(SMnode *pMnode) {}
2,011✔
146

147
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
1,496✔
148
  int32_t  code = -1;
1,496✔
149
  SSdbRaw *pRaw = NULL;
1,496✔
150
  STrans  *pTrans = NULL;
1,496✔
151

152
  SDnodeObj dnodeObj = {0};
1,496✔
153
  dnodeObj.id = 1;
1,496✔
154
  dnodeObj.createdTime = taosGetTimestampMs();
1,496✔
155
  dnodeObj.updateTime = dnodeObj.createdTime;
1,496✔
156
  dnodeObj.port = tsServerPort;
1,496✔
157
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
1,496✔
158
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
1,496✔
159
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
1,496✔
160
  char *machineId = NULL;
1,496✔
161
  code = tGetMachineId(&machineId);
1,496✔
162
  if (machineId) {
1,496!
163
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
1,496✔
164
    taosMemoryFreeClear(machineId);
1,496!
165
  } else {
166
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
167
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
168
    goto _OVER;
×
169
#endif
170
  }
171

172
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
1,496✔
173
  if (pTrans == NULL) {
1,496!
174
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
175
    if (terrno != 0) code = terrno;
×
176
    goto _OVER;
×
177
  }
178
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
1,496!
179

180
  pRaw = mndDnodeActionEncode(&dnodeObj);
1,496✔
181
  if (pRaw == NULL) {
1,496!
182
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
183
    if (terrno != 0) code = terrno;
×
184
    goto _OVER;
×
185
  }
186
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
1,496!
187
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
1,496!
188
  pRaw = NULL;
1,496✔
189

190
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,496!
191
  code = 0;
1,496✔
192
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
1,496✔
193
                                   1);  // TODO: check the return value
194

195
_OVER:
1,496✔
196
  mndTransDrop(pTrans);
1,496✔
197
  sdbFreeRaw(pRaw);
1,496✔
198
  return code;
1,496✔
199
}
200

201
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
7,565✔
202
  int32_t code = 0;
7,565✔
203
  int32_t lino = 0;
7,565✔
204
  terrno = TSDB_CODE_OUT_OF_MEMORY;
7,565✔
205

206
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
7,565✔
207
  if (pRaw == NULL) goto _OVER;
7,565!
208

209
  int32_t dataPos = 0;
7,565✔
210
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
7,565!
211
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
7,565!
212
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
7,565!
213
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
7,565!
214
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
7,565!
215
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
7,565!
216
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
7,565!
217
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
7,565!
218
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
7,565!
219
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
7,565!
220

221
  terrno = 0;
7,565✔
222

223
_OVER:
7,565✔
224
  if (terrno != 0) {
7,565!
225
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
226
    sdbFreeRaw(pRaw);
×
227
    return NULL;
×
228
  }
229

230
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
7,565✔
231
  return pRaw;
7,565✔
232
}
233

234
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
4,636✔
235
  int32_t code = 0;
4,636✔
236
  int32_t lino = 0;
4,636✔
237
  terrno = TSDB_CODE_OUT_OF_MEMORY;
4,636✔
238
  SSdbRow   *pRow = NULL;
4,636✔
239
  SDnodeObj *pDnode = NULL;
4,636✔
240

241
  int8_t sver = 0;
4,636✔
242
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
4,636!
243
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
4,636!
244
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
245
    goto _OVER;
×
246
  }
247

248
  pRow = sdbAllocRow(sizeof(SDnodeObj));
4,636✔
249
  if (pRow == NULL) goto _OVER;
4,636!
250

251
  pDnode = sdbGetRowObj(pRow);
4,636✔
252
  if (pDnode == NULL) goto _OVER;
4,636!
253

254
  int32_t dataPos = 0;
4,636✔
255
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
4,636!
256
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
4,636!
257
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
4,636!
258
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
4,636!
259
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
4,636!
260
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
4,636!
261
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
4,636!
262
  if (sver > 1) {
4,636!
263
    int16_t keyLen = 0;
4,636✔
264
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
4,636!
265
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
4,636!
266
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
4,636!
267
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
4,636!
268
  }
269

270
  terrno = 0;
4,636✔
271
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
4,636!
272
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
273
  }
274

275
_OVER:
4,636✔
276
  if (terrno != 0) {
4,636!
277
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
278
    taosMemoryFreeClear(pRow);
×
279
    return NULL;
×
280
  }
281

282
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
4,636✔
283
  return pRow;
4,636✔
284
}
285

286
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
3,719✔
287
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
3,719✔
288
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
3,719✔
289

290
  char ep[TSDB_EP_LEN] = {0};
3,719✔
291
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
3,719✔
292
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
3,719✔
293
  return 0;
3,719✔
294
}
295

296
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
4,631✔
297
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
4,631✔
298
  return 0;
4,631✔
299
}
300

301
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
892✔
302
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
892!
303
  pOld->updateTime = pNew->updateTime;
892✔
304
#ifdef TD_ENTERPRISE
305
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
892✔
306
#endif
307
  return 0;
892✔
308
}
309

310
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
18,687,065✔
311
  SSdb      *pSdb = pMnode->pSdb;
18,687,065✔
312
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
18,687,065✔
313
  if (pDnode == NULL) {
18,691,310✔
314
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
611✔
315
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
29✔
316
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
582!
317
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
318
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
582!
319
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
582✔
320
    } else {
321
      terrno = TSDB_CODE_APP_ERROR;
×
322
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
323
    }
324
  }
325

326
  return pDnode;
18,691,231✔
327
}
328

329
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
18,446,555✔
330
  SSdb *pSdb = pMnode->pSdb;
18,446,555✔
331
  sdbRelease(pSdb, pDnode);
18,446,555✔
332
}
18,448,604✔
333

334
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
30,932✔
335
  SEpSet epSet = {0};
30,932✔
336
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
30,932✔
337
  return epSet;
30,932✔
338
}
339

340
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
341
  SEpSet     epSet = {0};
×
342
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
343
  if (!pDnode) return epSet;
×
344

345
  epSet = mndGetDnodeEpset(pDnode);
×
346

347
  mndReleaseDnode(pMnode, pDnode);
×
348
  return epSet;
×
349
}
350

351
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
3,056✔
352
  SSdb *pSdb = pMnode->pSdb;
3,056✔
353

354
  void *pIter = NULL;
3,056✔
355
  while (1) {
4,100✔
356
    SDnodeObj *pDnode = NULL;
7,156✔
357
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
7,156✔
358
    if (pIter == NULL) break;
7,156✔
359

360
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
6,065✔
361
      sdbCancelFetch(pSdb, pIter);
1,965✔
362
      return pDnode;
1,965✔
363
    }
364

365
    sdbRelease(pSdb, pDnode);
4,100✔
366
  }
367

368
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
1,091✔
369
  return NULL;
1,091✔
370
}
371

372
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
332✔
373
  SSdb *pSdb = pMnode->pSdb;
332✔
374

375
  void *pIter = NULL;
332✔
376
  while (1) {
332✔
377
    SDnodeObj *pDnode = NULL;
664✔
378
    ESdbStatus objStatus = 0;
664✔
379
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
664✔
380
    if (pIter == NULL) break;
664!
381

382
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
664✔
383
      sdbCancelFetch(pSdb, pIter);
332✔
384
      return pDnode;
332✔
385
    }
386

387
    sdbRelease(pSdb, pDnode);
332✔
388
  }
389

390
  return NULL;
×
391
}
392

393
int32_t mndGetDnodeSize(SMnode *pMnode) {
607,356✔
394
  SSdb *pSdb = pMnode->pSdb;
607,356✔
395
  return sdbGetSize(pSdb, SDB_DNODE);
607,356✔
396
}
397

398
int32_t mndGetDbSize(SMnode *pMnode) {
×
399
  SSdb *pSdb = pMnode->pSdb;
×
400
  return sdbGetSize(pSdb, SDB_DB);
×
401
}
402

403
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
1,314,000✔
404
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
1,314,000✔
405
  if (interval > 5000 * (int64_t)tsStatusInterval) {
1,314,000✔
406
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
7,085✔
407
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
76✔
408
    }
409
    return false;
7,085✔
410
  }
411
  return true;
1,306,915✔
412
}
413

414
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
4,112✔
415
  SSdb *pSdb = pMnode->pSdb;
4,112✔
416

417
  int32_t numOfEps = 0;
4,112✔
418
  void   *pIter = NULL;
4,112✔
419
  while (1) {
13,786✔
420
    SDnodeObj *pDnode = NULL;
17,898✔
421
    ESdbStatus objStatus = 0;
17,898✔
422
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
17,898✔
423
    if (pIter == NULL) break;
17,898✔
424

425
    SDnodeEp dnodeEp = {0};
13,786✔
426
    dnodeEp.id = pDnode->id;
13,786✔
427
    dnodeEp.ep.port = pDnode->port;
13,786✔
428
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
13,786✔
429
    sdbRelease(pSdb, pDnode);
13,786✔
430

431
    dnodeEp.isMnode = 0;
13,786✔
432
    if (mndIsMnode(pMnode, pDnode->id)) {
13,786✔
433
      dnodeEp.isMnode = 1;
5,717✔
434
    }
435
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
13,786!
436
      mError("failed to put ep into array, but continue at this call");
×
437
    }
438
  }
439
}
4,112✔
440

441
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
54,432✔
442
  SSdb   *pSdb = pMnode->pSdb;
54,432✔
443
  int32_t code = 0;
54,432✔
444

445
  int32_t numOfEps = 0;
54,432✔
446
  void   *pIter = NULL;
54,432✔
447
  while (1) {
237,494✔
448
    SDnodeObj *pDnode = NULL;
291,926✔
449
    ESdbStatus objStatus = 0;
291,926✔
450
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
291,926✔
451
    if (pIter == NULL) break;
291,926✔
452

453
    SDnodeInfo dInfo;
454
    dInfo.id = pDnode->id;
237,494✔
455
    dInfo.ep.port = pDnode->port;
237,494✔
456
    dInfo.offlineReason = pDnode->offlineReason;
237,494✔
457
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
237,494✔
458
    sdbRelease(pSdb, pDnode);
237,494✔
459
    if (mndIsMnode(pMnode, pDnode->id)) {
237,494✔
460
      dInfo.isMnode = 1;
68,839✔
461
    } else {
462
      dInfo.isMnode = 0;
168,655✔
463
    }
464

465
    if(taosArrayPush(pDnodeInfo, &dInfo) == NULL){
237,494!
466
      code = terrno;
×
467
      sdbCancelFetch(pSdb, pIter);
×
468
      break;
×
469
    }
470
  }
471
  TAOS_RETURN(code);
54,432✔
472
}
473

474
#define CHECK_MONITOR_PARA(para,err) \
475
if (pCfg->monitorParas.para != para) { \
476
  mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
477
  terrno = err; \
478
  return err;\
479
}
480

481
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
4,112✔
482
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
4,112!
483
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
4,112!
484
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
4,112!
485
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
4,112!
486
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
4,112!
487

488
  if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
4,112!
489
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
×
490
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
491
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
492
  }
493

494
  if (pCfg->statusInterval != tsStatusInterval) {
4,112!
495
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
496
           tsStatusInterval);
497
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
498
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
499
  }
500

501
  if ((0 != strcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
4,112!
502
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
503
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
504
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
505
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
506
  }
507

508
  if (0 != strcasecmp(pCfg->locale, tsLocale)) {
4,112!
509
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
510
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
511
    return DND_REASON_LOCALE_NOT_MATCH;
×
512
  }
513

514
  if (0 != strcasecmp(pCfg->charset, tsCharset)) {
4,112!
515
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
516
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
517
    return DND_REASON_CHARSET_NOT_MATCH;
×
518
  }
519

520
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
4,112!
521
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
522
           tsTtlChangeOnWrite);
523
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
524
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
525
  }
526
  int8_t enable = tsEnableWhiteList ? 1 : 0;
4,112✔
527
  if (pCfg->enableWhiteList != enable) {
4,112!
528
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
529
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
530
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
531
  }
532

533
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
4,112!
534
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
4,112!
535
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
536
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
537
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
538
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
539
  }
540

541
  return DND_REASON_ONLINE;
4,112✔
542
}
543

544
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
967,873✔
545
  bool stateChanged = false;
967,873✔
546
  bool roleChanged = pGid->syncState != pVload->syncState ||
2,900,000✔
547
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
1,921,522!
548
                     pGid->roleTimeMs != pVload->roleTimeMs;
953,649✔
549
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
967,873!
550
      pGid->startTimeMs != pVload->startTimeMs) {
952,998✔
551
    mInfo(
14,876!
552
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
553
        "canRead:%d, dnode:%d",
554
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
555
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
556
    pGid->syncState = pVload->syncState;
14,876✔
557
    pGid->syncTerm = pVload->syncTerm;
14,876✔
558
    pGid->syncRestore = pVload->syncRestore;
14,876✔
559
    pGid->syncCanRead = pVload->syncCanRead;
14,876✔
560
    pGid->startTimeMs = pVload->startTimeMs;
14,876✔
561
    pGid->roleTimeMs = pVload->roleTimeMs;
14,876✔
562
    stateChanged = true;
14,876✔
563
  }
564
  return stateChanged;
967,873✔
565
}
566

567
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
67,180✔
568
  bool stateChanged = false;
67,180✔
569
  bool roleChanged = pObj->syncState != pMload->syncState ||
199,557✔
570
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
132,302!
571
                     pObj->roleTimeMs != pMload->roleTimeMs;
65,122✔
572
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
67,180✔
573
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
2,087!
574
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
575
          pObj->syncTerm, pMload->syncTerm);
576
    pObj->syncState = pMload->syncState;
2,087✔
577
    pObj->syncTerm = pMload->syncTerm;
2,087✔
578
    pObj->syncRestore = pMload->syncRestore;
2,087✔
579
    pObj->roleTimeMs = pMload->roleTimeMs;
2,087✔
580
    stateChanged = true;
2,087✔
581
  }
582
  return stateChanged;
67,180✔
583
}
584

585
extern char* tsMonFwUri;
586
extern char* tsMonSlowLogUri;
587
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
11,529✔
588
  SMnode    *pMnode = pReq->info.node;
11,529✔
589
  SStatisReq statisReq = {0};
11,529✔
590
  int32_t    code = -1;
11,529✔
591

592
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
11,529!
593

594
  if (tsMonitorLogProtocol) {
11,529!
595
    mInfo("process statis req,\n %s", statisReq.pCont);
×
596
  }
597

598
  if (statisReq.type == MONITOR_TYPE_COUNTER){
11,529✔
599
    monSendContent(statisReq.pCont, tsMonFwUri);
8,963✔
600
  }else if(statisReq.type == MONITOR_TYPE_SLOW_LOG){
2,566!
601
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
2,566✔
602
  }
603

604
  tFreeSStatisReq(&statisReq);
11,529✔
605
  return 0;
11,529✔
606
}
607

608
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
59,404✔
609
  mTrace("process audit req:%p", pReq);
59,404✔
610
  if (tsEnableAudit && tsEnableAuditDelete) {
59,405!
611
    SMnode   *pMnode = pReq->info.node;
59,405✔
612
    SAuditReq auditReq = {0};
59,405✔
613

614
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
59,405!
615

616
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
59,404✔
617

618
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
59,404✔
619
                   auditReq.sqlLen);
620

621
    tFreeSAuditReq(&auditReq);
59,405✔
622
  }
623
  return 0;
59,406✔
624
}
625

626
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
470✔
627
  int32_t       code = 0, lino = 0;
470✔
628
  SDnodeInfoReq infoReq = {0};
470✔
629
  int32_t       contLen = 0;
470✔
630
  void         *pReq = NULL;
470✔
631

632
  infoReq.dnodeId = pDnode->id;
470✔
633
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
470✔
634

635
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
470!
636
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
637
  }
638
  pReq = rpcMallocCont(contLen);
470✔
639
  if (pReq == NULL) {
470!
640
    TAOS_RETURN(terrno);
×
641
  }
642

643
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
470!
644
    code = contLen;
×
645
    goto _exit;
×
646
  }
647

648
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
470✔
649
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
470!
650
_exit:
470✔
651
  if (code < 0) {
470!
652
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
653
  }
654
  TAOS_RETURN(code);
470✔
655
}
656

657
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
470✔
658
  int32_t       code = 0, lino = 0;
470✔
659
  SMnode       *pMnode = pReq->info.node;
470✔
660
  SDnodeInfoReq infoReq = {0};
470✔
661
  SDnodeObj    *pDnode = NULL;
470✔
662
  STrans       *pTrans = NULL;
470✔
663
  SSdbRaw      *pCommitRaw = NULL;
470✔
664

665
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
470!
666

667
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
470✔
668
  if (pDnode == NULL) {
470!
669
    TAOS_CHECK_EXIT(terrno);
×
670
  }
671

672
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
470✔
673
  if (pTrans == NULL) {
470!
674
    TAOS_CHECK_EXIT(terrno);
×
675
  }
676

677
  pDnode->updateTime = taosGetTimestampMs();
470✔
678

679
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
470!
680
    TAOS_CHECK_EXIT(terrno);
×
681
  }
682
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
470!
683
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
684
    TAOS_CHECK_EXIT(code);
×
685
  }
686
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
470!
687
  pCommitRaw = NULL;
470✔
688

689
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
470!
690
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
691
    TAOS_CHECK_EXIT(code);
×
692
  }
693

694
_exit:
470✔
695
  mndReleaseDnode(pMnode, pDnode);
470✔
696
  if (code != 0) {
470!
697
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
698
  }
699
  mndTransDrop(pTrans);
470✔
700
  sdbFreeRaw(pCommitRaw);
470✔
701
  TAOS_RETURN(code);
470✔
702
}
703

704
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
102,653✔
705
  SMnode    *pMnode = pReq->info.node;
102,653✔
706
  SStatusReq statusReq = {0};
102,653✔
707
  SDnodeObj *pDnode = NULL;
102,653✔
708
  int32_t    code = -1;
102,653✔
709

710
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
102,653!
711

712
  int64_t clusterid = mndGetClusterId(pMnode);
102,652✔
713
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
102,654✔
714
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
21✔
715
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
21!
716
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
717
    goto _OVER;
21✔
718
  }
719

720
  if (statusReq.dnodeId == 0) {
102,633✔
721
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
2,235✔
722
    if (pDnode == NULL) {
2,235✔
723
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
273!
724
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
273✔
725
      if (terrno != 0) code = terrno;
273!
726
      goto _OVER;
273✔
727
    }
728
  } else {
729
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
100,398✔
730
    if (pDnode == NULL) {
100,398✔
731
      int32_t err = terrno;
347✔
732
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
347✔
733
      if (pDnode != NULL) {
347✔
734
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
3✔
735
        terrno = err;
3✔
736
        goto _OVER;
3✔
737
      }
738

739
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
344!
740
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
344✔
741
        terrno = err;
12✔
742
        goto _OVER;
12✔
743
      } else {
744
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
332✔
745
        if (pDnode == NULL) goto _OVER;
332!
746
      }
747
    }
748
  }
749

750
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
102,345✔
751

752
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
102,345✔
753
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
102,343✔
754
  int64_t curMs = taosGetTimestampMs();
102,344✔
755
  bool    online = mndIsDnodeOnline(pDnode, curMs);
102,344✔
756
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
102,345✔
757
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
102,345✔
758
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
102,345✔
759
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
102,345✔
760
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
102,345✔
761
  bool    analVerChanged = (analVer != statusReq.analVer);
102,345✔
762
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
99,842!
763
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
202,187!
764
  const STraceId *trace = &pReq->info.traceId;
102,345✔
765
  mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
102,345!
766
          pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
767

768
  if (reboot) {
102,345✔
769
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
2,574✔
770
  }
771

772
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
1,071,938✔
773
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
969,593✔
774

775
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
969,592✔
776
    if (pVgroup != NULL) {
969,593✔
777
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
967,906✔
778
        pVgroup->cacheUsage = pVload->cacheUsage;
917,798✔
779
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
917,798✔
780
        pVgroup->numOfTables = pVload->numOfTables;
917,798✔
781
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
917,798✔
782
        pVgroup->totalStorage = pVload->totalStorage;
917,798✔
783
        pVgroup->compStorage = pVload->compStorage;
917,798✔
784
        pVgroup->pointsWritten = pVload->pointsWritten;
917,798✔
785
      }
786
      bool stateChanged = false;
967,906✔
787
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
1,035,633✔
788
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
1,035,601✔
789
        if (pGid->dnodeId == statusReq.dnodeId) {
1,035,601✔
790
          if (pVload->startTimeMs == 0) {
967,874!
791
            pVload->startTimeMs = statusReq.rebootTime;
×
792
          }
793
          if (pVload->roleTimeMs == 0) {
967,874!
794
            pVload->roleTimeMs = statusReq.rebootTime;
×
795
          }
796
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
967,874✔
797
          break;
967,872✔
798
        }
799
      }
800
      if (stateChanged) {
967,904✔
801
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
14,876✔
802
        if (pDb != NULL && pDb->stateTs != curMs) {
14,875✔
803
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
9,136!
804
                pDb->stateTs, curMs);
805
          pDb->stateTs = curMs;
9,136✔
806
        }
807
        mndReleaseDb(pMnode, pDb);
14,875✔
808
      }
809
    }
810

811
    mndReleaseVgroup(pMnode, pVgroup);
969,591✔
812
  }
813

814
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
102,343✔
815
  if (pObj != NULL) {
102,345✔
816
    if (statusReq.mload.roleTimeMs == 0) {
67,180✔
817
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
8✔
818
    }
819
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
67,180✔
820
    mndReleaseMnode(pMnode, pObj);
67,180✔
821
  }
822

823
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
102,345✔
824
  if (pQnode != NULL) {
102,345✔
825
    pQnode->load = statusReq.qload;
40,025✔
826
    mndReleaseQnode(pMnode, pQnode);
40,025✔
827
  }
828

829
  if (needCheck) {
102,345✔
830
    if (statusReq.sver != tsVersion) {
4,112!
831
      if (pDnode != NULL) {
×
832
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
833
      }
834
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
835
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
836
      goto _OVER;
×
837
    }
838

839
    if (statusReq.dnodeId == 0) {
4,112✔
840
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
1,962!
841
    } else {
842
      if (statusReq.clusterId != pMnode->clusterId) {
2,150!
843
        if (pDnode != NULL) {
×
844
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
845
        }
846
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
847
               pMnode->clusterId);
848
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
849
        goto _OVER;
×
850
      }
851
    }
852

853
    // Verify whether the cluster parameters are consistent when status change from offline to ready
854
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
4,112✔
855
    if (pDnode->offlineReason != 0) {
4,112!
856
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
857
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
858
      goto _OVER;
×
859
    }
860

861
    if (!online) {
4,112✔
862
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
2,503!
863
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
864
    } else {
865
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
1,609!
866
            statusReq.dnodeVer, dnodeVer, reboot);
867
    }
868

869
    pDnode->rebootTime = statusReq.rebootTime;
4,112✔
870
    pDnode->numOfCores = statusReq.numOfCores;
4,112✔
871
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
4,112✔
872
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
4,112✔
873
    pDnode->memAvail = statusReq.memAvail;
4,112✔
874
    pDnode->memTotal = statusReq.memTotal;
4,112✔
875
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
4,112✔
876
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
4,112✔
877
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
4,112✔
878
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
470✔
879
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
470!
880
        goto _OVER;
×
881
      }
882
    }
883

884
    SStatusRsp statusRsp = {0};
4,112✔
885
    statusRsp.statusSeq++;
4,112✔
886
    statusRsp.analVer = analVer;
4,112✔
887
    statusRsp.dnodeVer = dnodeVer;
4,112✔
888
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
4,112✔
889
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
4,112✔
890
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
4,112✔
891
    if (statusRsp.pDnodeEps == NULL) {
4,112!
892
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
893
      goto _OVER;
×
894
    }
895

896
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
4,112✔
897
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
4,112✔
898

899
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
4,112✔
900
    void   *pHead = rpcMallocCont(contLen);
4,112✔
901
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
4,112✔
902
    taosArrayDestroy(statusRsp.pDnodeEps);
4,112✔
903
    if (contLen < 0) {
4,112!
904
      code = contLen;
×
905
      goto _OVER;
×
906
    }
907

908
    pReq->info.rspLen = contLen;
4,112✔
909
    pReq->info.rsp = pHead;
4,112✔
910
  }
911

912
  pDnode->accessTimes++;
102,345✔
913
  pDnode->lastAccessTime = curMs;
102,345✔
914
  code = 0;
102,345✔
915

916
_OVER:
102,654✔
917
  mndReleaseDnode(pMnode, pDnode);
102,654✔
918
  taosArrayDestroy(statusReq.pVloads);
102,654✔
919
  return mndUpdClusterInfo(pReq);
102,654✔
920
}
921

922
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
923
  SMnode    *pMnode = pReq->info.node;
×
924
  SNotifyReq notifyReq = {0};
×
925
  int32_t    code = 0;
×
926

927
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
928
    terrno = code;
×
929
    goto _OVER;
×
930
  }
931

932
  int64_t clusterid = mndGetClusterId(pMnode);
×
933
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
934
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
935
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
936
          notifyReq.clusterId, clusterid, tstrerror(code));
937
    goto _OVER;
×
938
  }
939

940
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
941
  for (int32_t v = 0; v < nVgroup; ++v) {
×
942
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
943

944
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
945
    if (pVgroup != NULL) {
×
946
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
947
      mndReleaseVgroup(pMnode, pVgroup);
×
948
    }
949
  }
950
  code = mndUpdClusterInfo(pReq);
×
951
_OVER:
×
952
  tFreeSNotifyReq(&notifyReq);
×
953
  return code;
×
954
}
955

956
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
473✔
957
  int32_t  code = -1;
473✔
958
  SSdbRaw *pRaw = NULL;
473✔
959
  STrans  *pTrans = NULL;
473✔
960

961
  SDnodeObj dnodeObj = {0};
473✔
962
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
473✔
963
  dnodeObj.createdTime = taosGetTimestampMs();
473✔
964
  dnodeObj.updateTime = dnodeObj.createdTime;
473✔
965
  dnodeObj.port = pCreate->port;
473✔
966
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
473✔
967
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
473✔
968

969
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
473✔
970
  if (pTrans == NULL) {
473!
971
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
972
    if (terrno != 0) code = terrno;
×
973
    goto _OVER;
×
974
  }
975
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
473!
976
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
473✔
977

978
  pRaw = mndDnodeActionEncode(&dnodeObj);
472✔
979
  if (pRaw == NULL) {
472!
980
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
981
    if (terrno != 0) code = terrno;
×
982
    goto _OVER;
×
983
  }
984
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
472!
985
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
472!
986
  pRaw = NULL;
472✔
987

988
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
472!
989
  code = 0;
472✔
990

991
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
472✔
992
                                   1);  // TODO: check the return value
993
_OVER:
473✔
994
  mndTransDrop(pTrans);
473✔
995
  sdbFreeRaw(pRaw);
473✔
996
  return code;
473✔
997
}
998

999
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
6,215✔
1000
  SMnode       *pMnode = pReq->info.node;
6,215✔
1001
  SSdb         *pSdb = pMnode->pSdb;
6,215✔
1002
  SDnodeObj    *pObj = NULL;
6,215✔
1003
  void         *pIter = NULL;
6,215✔
1004
  SDnodeListRsp rsp = {0};
6,215✔
1005
  int32_t       code = -1;
6,215✔
1006

1007
  rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
6,215✔
1008
  if (NULL == rsp.dnodeList) {
6,215!
1009
    mError("failed to alloc epSet while process dnode list req");
×
1010
    code = terrno;
×
1011
    goto _OVER;
×
1012
  }
1013

1014
  while (1) {
6,234✔
1015
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
12,449✔
1016
    if (pIter == NULL) break;
12,449✔
1017

1018
    SEpSet epSet = {0};
6,234✔
1019
    epSet.numOfEps = 1;
6,234✔
1020
    tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
6,234✔
1021
    epSet.eps[0].port = pObj->port;
6,234✔
1022

1023
    if (taosArrayPush(rsp.dnodeList, &epSet) == NULL) {
12,468!
1024
      if (terrno != 0) code = terrno;
×
1025
      sdbRelease(pSdb, pObj);
×
1026
      sdbCancelFetch(pSdb, pIter);
×
1027
      goto _OVER;
×
1028
    }
1029

1030
    sdbRelease(pSdb, pObj);
6,234✔
1031
  }
1032

1033
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
6,215✔
1034
  void   *pRsp = rpcMallocCont(rspLen);
6,215✔
1035
  if (pRsp == NULL) {
6,215!
1036
    code = terrno;
×
1037
    goto _OVER;
×
1038
  }
1039

1040
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
6,215!
1041
    code = rspLen;
×
1042
    goto _OVER;
×
1043
  }
1044

1045
  pReq->info.rspLen = rspLen;
6,215✔
1046
  pReq->info.rsp = pRsp;
6,215✔
1047
  code = 0;
6,215✔
1048

1049
_OVER:
6,215✔
1050

1051
  if (code != 0) {
6,215!
1052
    mError("failed to get dnode list since %s", tstrerror(code));
×
1053
  }
1054

1055
  tFreeSDnodeListRsp(&rsp);
6,215✔
1056

1057
  TAOS_RETURN(code);
6,215✔
1058
}
1059

1060
static void getSlowLogScopeString(int32_t scope, char* result){
1,455✔
1061
  if(scope == SLOW_LOG_TYPE_NULL) {
1,455!
1062
    (void)strcat(result, "NONE");
×
1063
    return;
×
1064
  }
1065
  while(scope > 0){
5,791✔
1066
    if(scope & SLOW_LOG_TYPE_QUERY) {
4,335✔
1067
      (void)strcat(result, "QUERY");
1,456✔
1068
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,456✔
1069
    } else if(scope & SLOW_LOG_TYPE_INSERT) {
2,879✔
1070
      (void)strcat(result, "INSERT");
1,440✔
1071
      scope &= ~SLOW_LOG_TYPE_INSERT;
1,440✔
1072
    } else if(scope & SLOW_LOG_TYPE_OTHERS) {
1,439!
1073
      (void)strcat(result, "OTHERS");
1,440✔
1074
      scope &= ~SLOW_LOG_TYPE_OTHERS;
1,440✔
1075
    } else{
1076
      (void)printf("invalid slow log scope:%d", scope);
×
1077
      return;
×
1078
    }
1079

1080
    if(scope > 0) {
4,336✔
1081
      (void)strcat(result, "|");
2,879✔
1082
    }
1083
  }
1084
}
1085

1086
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
11✔
1087
  SShowVariablesRsp rsp = {0};
11✔
1088
  int32_t           code = -1;
11✔
1089

1090
  if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_SHOW_VARIABLES) != 0) {
11!
1091
    goto _OVER;
×
1092
  }
1093

1094
  rsp.variables = taosArrayInit(16, sizeof(SVariablesInfo));
11✔
1095
  if (NULL == rsp.variables) {
11!
1096
    mError("failed to alloc SVariablesInfo array while process show variables req");
×
1097
    code = terrno;
×
1098
    goto _OVER;
×
1099
  }
1100

1101
  SVariablesInfo info = {0};
11✔
1102

1103
  (void)strcpy(info.name, "statusInterval");
11✔
1104
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
11✔
1105
  (void)strcpy(info.scope, "server");
11✔
1106
  // fill info.info
1107
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1108
    code = terrno;
×
1109
    goto _OVER;
×
1110
  }
1111

1112
  (void)strcpy(info.name, "timezone");
11✔
1113
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
11✔
1114
  (void)strcpy(info.scope, "both");
11✔
1115
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1116
    code = terrno;
×
1117
    goto _OVER;
×
1118
  }
1119

1120
  (void)strcpy(info.name, "locale");
11✔
1121
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
11✔
1122
  (void)strcpy(info.scope, "both");
11✔
1123
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1124
    code = terrno;
×
1125
    goto _OVER;
×
1126
  }
1127

1128
  (void)strcpy(info.name, "charset");
11✔
1129
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
11✔
1130
  (void)strcpy(info.scope, "both");
11✔
1131
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1132
    code = terrno;
×
1133
    goto _OVER;
×
1134
  }
1135

1136
  (void)strcpy(info.name, "monitor");
11✔
1137
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
11✔
1138
  (void)strcpy(info.scope, "server");
11✔
1139
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1140
    code = terrno;
×
1141
    goto _OVER;
×
1142
  }
1143

1144
  (void)strcpy(info.name, "monitorInterval");
11✔
1145
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
11✔
1146
  (void)strcpy(info.scope, "server");
11✔
1147
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1148
    code = terrno;
×
1149
    goto _OVER;
×
1150
  }
1151

1152
  (void)strcpy(info.name, "slowLogThreshold");
11✔
1153
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
11✔
1154
  (void)strcpy(info.scope, "server");
11✔
1155
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1156
    code = terrno;
×
1157
    goto _OVER;
×
1158
  }
1159

1160
  (void)strcpy(info.name, "slowLogMaxLen");
11✔
1161
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
11✔
1162
  (void)strcpy(info.scope, "server");
11✔
1163
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1164
    code = terrno;
×
1165
    goto _OVER;
×
1166
  }
1167

1168
  char scopeStr[64] = {0};
11✔
1169
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
11✔
1170
  (void)strcpy(info.name, "slowLogScope");
11✔
1171
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
11✔
1172
  (void)strcpy(info.scope, "server");
11✔
1173
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1174
    code = terrno;
×
1175
    goto _OVER;
×
1176
  }
1177

1178
  int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp);
11✔
1179
  void   *pRsp = rpcMallocCont(rspLen);
11✔
1180
  if (pRsp == NULL) {
11!
1181
    code = terrno;
×
1182
    goto _OVER;
×
1183
  }
1184

1185
  if ((rspLen = tSerializeSShowVariablesRsp(pRsp, rspLen, &rsp)) <= 0) {
11!
1186
    code = rspLen;
×
1187
    goto _OVER;
×
1188
  }
1189

1190
  pReq->info.rspLen = rspLen;
11✔
1191
  pReq->info.rsp = pRsp;
11✔
1192
  code = 0;
11✔
1193

1194
_OVER:
11✔
1195

1196
  if (code != 0) {
11!
1197
    mError("failed to get show variables info since %s", tstrerror(code));
×
1198
  }
1199

1200
  tFreeSShowVariablesRsp(&rsp);
11✔
1201
  TAOS_RETURN(code);
11✔
1202
}
1203

1204
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
474✔
1205
  SMnode         *pMnode = pReq->info.node;
474✔
1206
  int32_t         code = -1;
474✔
1207
  SDnodeObj      *pDnode = NULL;
474✔
1208
  SCreateDnodeReq createReq = {0};
474✔
1209

1210
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
474!
1211
    goto _OVER;
×
1212
  }
1213

1214
  TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
474!
1215

1216
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
474!
1217
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
474✔
1218

1219
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
473!
1220
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1221
    goto _OVER;
×
1222
  }
1223

1224
  char ep[TSDB_EP_LEN];
1225
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
473✔
1226
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
473✔
1227
  if (pDnode != NULL) {
473!
1228
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1229
    goto _OVER;
×
1230
  }
1231

1232
  code = mndCreateDnode(pMnode, pReq, &createReq);
473✔
1233
  if (code == 0) {
473✔
1234
    code = TSDB_CODE_ACTION_IN_PROGRESS;
472✔
1235
    tsGrantHBInterval = 5;
472✔
1236
  }
1237

1238
  char obj[200] = {0};
473✔
1239
  (void)sprintf(obj, "%s:%d", createReq.fqdn, createReq.port);
473✔
1240

1241
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
473✔
1242

1243
_OVER:
474✔
1244
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
474!
1245
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
2!
1246
  }
1247

1248
  mndReleaseDnode(pMnode, pDnode);
474✔
1249
  tFreeSCreateDnodeReq(&createReq);
474✔
1250
  TAOS_RETURN(code);
474✔
1251
}
1252

1253
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1254

1255
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
4✔
1256

1257
#ifndef TD_ENTERPRISE
1258
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1259
#endif
1260

1261
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
22✔
1262
                            SSnodeObj *pSObj, int32_t numOfVnodes, bool force, bool unsafe) {
1263
  int32_t  code = -1;
22✔
1264
  SSdbRaw *pRaw = NULL;
22✔
1265
  STrans  *pTrans = NULL;
22✔
1266

1267
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
22✔
1268
  if (pTrans == NULL) {
22!
1269
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1270
    if (terrno != 0) code = terrno;
×
1271
    goto _OVER;
×
1272
  }
1273
  mndTransSetSerial(pTrans);
22✔
1274
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
22!
1275
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
22!
1276

1277
  pRaw = mndDnodeActionEncode(pDnode);
22✔
1278
  if (pRaw == NULL) {
22!
1279
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1280
    if (terrno != 0) code = terrno;
×
1281
    goto _OVER;
×
1282
  }
1283
  TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRaw), NULL, _OVER);
22!
1284
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), NULL, _OVER);
22!
1285
  pRaw = NULL;
22✔
1286

1287
  pRaw = mndDnodeActionEncode(pDnode);
22✔
1288
  if (pRaw == NULL) {
22!
1289
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1290
    if (terrno != 0) code = terrno;
×
1291
    goto _OVER;
×
1292
  }
1293
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
22!
1294
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), NULL, _OVER);
22!
1295
  pRaw = NULL;
22✔
1296

1297
  if (pMObj != NULL) {
22✔
1298
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
4!
1299
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), NULL, _OVER);
4!
1300
  }
1301

1302
  if (pQObj != NULL) {
22✔
1303
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1304
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), NULL, _OVER);
3!
1305
  }
1306

1307
  if (pSObj != NULL) {
22✔
1308
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1309
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), NULL, _OVER);
3!
1310
  }
1311

1312
  if (numOfVnodes > 0) {
22✔
1313
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
16!
1314
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
16✔
1315
  }
1316

1317
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
20!
1318

1319
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP,
20✔
1320
                                   1);  // TODO: check the return value
1321
  code = 0;
20✔
1322

1323
_OVER:
22✔
1324
  mndTransDrop(pTrans);
22✔
1325
  sdbFreeRaw(pRaw);
22✔
1326
  TAOS_RETURN(code);
22✔
1327
}
1328

1329
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
24✔
1330
  bool       isEmpty = false;
24✔
1331
  SMnodeObj *pMObj = NULL;
24✔
1332
  SQnodeObj *pQObj = NULL;
24✔
1333
  SSnodeObj *pSObj = NULL;
24✔
1334

1335
  pQObj = mndAcquireQnode(pMnode, dnodeId);
24✔
1336
  if (pQObj) goto _OVER;
24✔
1337

1338
  pSObj = mndAcquireSnode(pMnode, dnodeId);
20✔
1339
  if (pSObj) goto _OVER;
20!
1340

1341
  pMObj = mndAcquireMnode(pMnode, dnodeId);
20✔
1342
  if (pMObj) goto _OVER;
20✔
1343

1344
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
18✔
1345
  if (numOfVnodes > 0) goto _OVER;
18✔
1346

1347
  isEmpty = true;
3✔
1348
_OVER:
24✔
1349
  mndReleaseMnode(pMnode, pMObj);
24✔
1350
  mndReleaseQnode(pMnode, pQObj);
24✔
1351
  mndReleaseSnode(pMnode, pSObj);
24✔
1352
  return isEmpty;
24✔
1353
}
1354

1355
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
30✔
1356
  SMnode       *pMnode = pReq->info.node;
30✔
1357
  int32_t       code = -1;
30✔
1358
  SDnodeObj    *pDnode = NULL;
30✔
1359
  SMnodeObj    *pMObj = NULL;
30✔
1360
  SQnodeObj    *pQObj = NULL;
30✔
1361
  SSnodeObj    *pSObj = NULL;
30✔
1362
  SDropDnodeReq dropReq = {0};
30✔
1363

1364
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
30!
1365

1366
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
30!
1367
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1368
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
30✔
1369

1370
  bool force = dropReq.force;
29✔
1371
  if (dropReq.unsafe) {
29✔
1372
    force = true;
1✔
1373
  }
1374

1375
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
29✔
1376
  if (pDnode == NULL) {
29✔
1377
    int32_t err = terrno;
1✔
1378
    char    ep[TSDB_EP_LEN + 1] = {0};
1✔
1379
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
1✔
1380
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
1✔
1381
    if (pDnode == NULL) {
1!
1382
      code = err;
1✔
1383
      goto _OVER;
1✔
1384
    }
1385
  }
1386

1387
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
28✔
1388
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
28✔
1389
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
28✔
1390
  if (pMObj != NULL) {
28✔
1391
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
9✔
1392
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
2✔
1393
      goto _OVER;
2✔
1394
    }
1395
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
7✔
1396
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
2✔
1397
      goto _OVER;
2✔
1398
    }
1399
  }
1400

1401
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
24✔
1402
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
24✔
1403

1404
  if (isonline && force) {
24!
1405
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1406
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1407
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1408
    goto _OVER;
×
1409
  }
1410

1411
  bool isEmpty = mndIsEmptyDnode(pMnode, pDnode->id);
24✔
1412
  if (!isonline && !force && !isEmpty) {
24!
1413
    code = TSDB_CODE_DNODE_OFFLINE;
2✔
1414
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
2!
1415
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1416
    goto _OVER;
2✔
1417
  }
1418

1419
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
22✔
1420
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
22✔
1421

1422
  char obj1[30] = {0};
22✔
1423
  (void)sprintf(obj1, "%d", dropReq.dnodeId);
22✔
1424

1425
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
22✔
1426

1427
_OVER:
30✔
1428
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
30!
1429
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
10!
1430
  }
1431

1432
  mndReleaseDnode(pMnode, pDnode);
30✔
1433
  mndReleaseMnode(pMnode, pMObj);
30✔
1434
  mndReleaseQnode(pMnode, pQObj);
30✔
1435
  mndReleaseSnode(pMnode, pSObj);
30✔
1436
  tFreeSDropDnodeReq(&dropReq);
30✔
1437
  TAOS_RETURN(code);
30✔
1438
}
1439

1440
static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
1,076✔
1441
  int32_t code = 0;
1,076✔
1442
  char *p = pMCfgReq->config;
1,076✔
1443
  while (*p) {
14,961✔
1444
    if (*p == ' ') {
14,006✔
1445
      break;
121✔
1446
    }
1447
    p++;
13,885✔
1448
  }
1449

1450
  size_t optLen = p - pMCfgReq->config;
1,076✔
1451
  (void)strncpy(pDCfgReq->config, pMCfgReq->config, optLen);
1,076✔
1452
  pDCfgReq->config[optLen] = 0;
1,076✔
1453

1454
  if (' ' == pMCfgReq->config[optLen]) {
1,076✔
1455
    // 'key value'
1456
    if (strlen(pMCfgReq->value) != 0) goto _err;
121!
1457
    (void)strcpy(pDCfgReq->value, p + 1);
121✔
1458
  } else {
1459
    // 'key' 'value'
1460
    if (strlen(pMCfgReq->value) == 0) goto _err;
955✔
1461
    (void)strcpy(pDCfgReq->value, pMCfgReq->value);
951✔
1462
  }
1463

1464
  TAOS_RETURN(code);
1,072✔
1465

1466
_err:
4✔
1467
  mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
4!
1468
  code = TSDB_CODE_INVALID_CFG;
4✔
1469
  TAOS_RETURN(code);
4✔
1470
}
1471

1472
static int32_t mndSendCfgDnodeReq(SMnode *pMnode, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
998✔
1473
  int32_t code = -1;
998✔
1474
  SSdb   *pSdb = pMnode->pSdb;
998✔
1475
  void   *pIter = NULL;
998✔
1476
  while (1) {
1,002✔
1477
    SDnodeObj *pDnode = NULL;
2,000✔
1478
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
2,000✔
1479
    if (pIter == NULL) break;
2,000✔
1480

1481
    if (pDnode->id == dnodeId || dnodeId == -1 || dnodeId == 0) {
1,002!
1482
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
994✔
1483
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
994✔
1484
      void   *pBuf = rpcMallocCont(bufLen);
994✔
1485

1486
      if (pBuf != NULL) {
994!
1487
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
994!
1488
          code = bufLen;
×
1489
          return code;
×
1490
        }
1491
        mInfo("dnode:%d, send config req to dnode, config:%s value:%s", dnodeId, pDcfgReq->config, pDcfgReq->value);
994!
1492
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
994✔
1493
        code = tmsgSendReq(&epSet, &rpcMsg);
994✔
1494
      }
1495
    }
1496

1497
    sdbRelease(pSdb, pDnode);
1,002✔
1498
  }
1499

1500
  if (code == -1) {
998✔
1501
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
4✔
1502
  }
1503
  TAOS_RETURN(code);
998✔
1504
}
1505

1506
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
1,085✔
1507
  int32_t       code = 0;
1,085✔
1508
  SMnode       *pMnode = pReq->info.node;
1,085✔
1509
  SMCfgDnodeReq cfgReq = {0};
1,085✔
1510
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1,085!
1511
  int8_t updateIpWhiteList = 0;
1,085✔
1512
  mInfo("dnode:%d, start to config, option:%s, value:%s", cfgReq.dnodeId, cfgReq.config, cfgReq.value);
1,085!
1513
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1,085✔
1514
    tFreeSMCfgDnodeReq(&cfgReq);
7✔
1515
    TAOS_RETURN(code);
7✔
1516
  }
1517

1518
  SDCfgDnodeReq dcfgReq = {0};
1,078✔
1519
  if (strcasecmp(cfgReq.config, "resetlog") == 0) {
1,078✔
1520
    (void)strcpy(dcfgReq.config, "resetlog");
2✔
1521
#ifdef TD_ENTERPRISE
1522
  } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
1,076!
1523
    int32_t optLen = strlen("s3blocksize");
×
1524
    int32_t flag = -1;
×
1525
    int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
×
1526
    if (code < 0) return code;
×
1527

1528
    if (flag > 1024 * 1024 || (flag > -1 && flag < 1024) || flag < -1) {
×
1529
      mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [1024, 1024 * 1024]",
×
1530
             cfgReq.dnodeId, flag);
1531
      code = TSDB_CODE_INVALID_CFG;
×
1532
      tFreeSMCfgDnodeReq(&cfgReq);
×
1533
      TAOS_RETURN(code);
×
1534
    }
1535

1536
    strcpy(dcfgReq.config, "s3blocksize");
×
1537
    snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
×
1538
#endif
1539
  } else {
1540
    TAOS_CHECK_GOTO (mndMCfg2DCfg(&cfgReq, &dcfgReq), NULL, _err_out);
1,076✔
1541
    if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) {
1,072!
1542
      mError("dnode:%d, failed to config since config is too long", cfgReq.dnodeId);
×
1543
      code = TSDB_CODE_INVALID_CFG;
×
1544
      goto _err_out;
×
1545
    }
1546
    if (strncasecmp(dcfgReq.config, "enableWhiteList", strlen("enableWhiteList")) == 0) {
1,072✔
1547
      updateIpWhiteList = 1;
4✔
1548
    }
1549

1550
    TAOS_CHECK_GOTO(cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true), NULL, _err_out);
1,072✔
1551
  }
1552

1553
  {  // audit
1554
    char obj[50] = {0};
998✔
1555
    (void)sprintf(obj, "%d", cfgReq.dnodeId);
998✔
1556

1557
    auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen);
998✔
1558
  }
1559

1560
  tFreeSMCfgDnodeReq(&cfgReq);
998✔
1561

1562
  code = mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
998✔
1563

1564
  // dont care suss or succ;
1565
  if (updateIpWhiteList) mndRefreshUserIpWhiteList(pMnode);
998✔
1566
  TAOS_RETURN(code);
998✔
1567

1568
_err_out:
80✔
1569
  tFreeSMCfgDnodeReq(&cfgReq);
80✔
1570
  TAOS_RETURN(code);
80✔
1571
}
1572

1573
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
994✔
1574
  mInfo("config rsp from dnode");
994!
1575
  return 0;
994✔
1576
}
1577

1578
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1✔
1579
  int32_t code = 0;
1✔
1580
  SMnode *pMnode = pReq->info.node;
1✔
1581
  SSdb   *pSdb = pMnode->pSdb;
1✔
1582
  void   *pIter = NULL;
1✔
1583
  int8_t  encrypting = 0;
1✔
1584

1585
  const STraceId *trace = &pReq->info.traceId;
1✔
1586

1587
  int32_t klen = strlen(pDcfgReq->value);
1✔
1588
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1!
1589
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1590
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1591
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1592
    goto _exit;
×
1593
  }
1594

1595
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1!
1596
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1597
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1598
    goto _exit;
×
1599
  }
1600

1601
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1!
1602
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1603
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1604
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1605
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1606
    goto _exit;
×
1607
  }
1608

1609
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1✔
1610
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1✔
1611
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1✔
1612

1613
  while (1) {
1✔
1614
    SDnodeObj *pDnode = NULL;
2✔
1615
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
2✔
1616
    if (pIter == NULL) break;
2✔
1617
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
1!
1618
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1619
             offlineReason[pDnode->offlineReason]);
1620
      sdbRelease(pSdb, pDnode);
×
1621
      continue;
×
1622
    }
1623

1624
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
1!
1625
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
1✔
1626
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
1✔
1627
      void   *pBuf = rpcMallocCont(bufLen);
1✔
1628

1629
      if (pBuf != NULL) {
1!
1630
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
1!
1631
          code = bufLen;
×
1632
          sdbRelease(pSdb, pDnode);
×
1633
          goto _exit;
×
1634
        }
1635
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
1✔
1636
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
1!
1637
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
1✔
1638
        }
1639
      }
1640
    }
1641

1642
    sdbRelease(pSdb, pDnode);
1✔
1643
  }
1644

1645
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1!
1646
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1647
  }
1648

1649
_exit:
1✔
1650
  if (code != 0) {
1!
1651
    if (terrno == 0) terrno = code;
×
1652
  }
1653
  return code;
1✔
1654
}
1655

1656
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1✔
1657
  int32_t code = 0;
1✔
1658

1659
#ifdef TD_ENTERPRISE
1660
  SMnode       *pMnode = pReq->info.node;
1✔
1661
  SMCfgDnodeReq cfgReq = {0};
1✔
1662
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1!
1663

1664
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1!
1665
    tFreeSMCfgDnodeReq(&cfgReq);
×
1666
    TAOS_RETURN(code);
×
1667
  }
1668
  const STraceId *trace = &pReq->info.traceId;
1✔
1669
  SDCfgDnodeReq   dcfgReq = {0};
1✔
1670
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1!
1671
    strcpy(dcfgReq.config, cfgReq.config);
1✔
1672
    strcpy(dcfgReq.value, cfgReq.value);
1✔
1673
    tFreeSMCfgDnodeReq(&cfgReq);
1✔
1674
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1✔
1675
  } else {
1676
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1677
    tFreeSMCfgDnodeReq(&cfgReq);
×
1678
    TAOS_RETURN(code);
×
1679
  }
1680

1681
#else
1682
  TAOS_RETURN(code);
1683
#endif
1684
}
1685

1686
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
1✔
1687
  SMnode *pMnode = pRsp->info.node;
1✔
1688
  int16_t nSuccess = 0;
1✔
1689
  int16_t nFailed = 0;
1✔
1690

1691
  if (0 == pRsp->code) {
1!
1692
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
1✔
1693
  } else {
1694
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1695
  }
1696

1697
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
1✔
1698
  bool    finished = nSuccess + nFailed >= nReq;
1✔
1699

1700
  if (finished) {
1!
1701
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1✔
1702
  }
1703

1704
  const STraceId *trace = &pRsp->info.traceId;
1✔
1705
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
1!
1706
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1707

1708
  return 0;
1✔
1709
}
1710

1711
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,446✔
1712
  SMnode *pMnode = pReq->info.node;
1,446✔
1713
  int32_t totalRows = 0;
1,446✔
1714
  int32_t numOfRows = 0;
1,446✔
1715
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,446✔
1716
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,446✔
1717
  char   *pWrite = NULL;
1,446✔
1718
  int32_t cols = 0;
1,446✔
1719
  int32_t code = 0;
1,446✔
1720
  int32_t lino = 0;
1,446✔
1721

1722
  cfgOpts[totalRows] = "statusInterval";
1,446✔
1723
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
1,446✔
1724
  totalRows++;
1,446✔
1725

1726
  cfgOpts[totalRows] = "timezone";
1,446✔
1727
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,446✔
1728
  totalRows++;
1,446✔
1729

1730
  cfgOpts[totalRows] = "locale";
1,446✔
1731
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,446✔
1732
  totalRows++;
1,446✔
1733

1734
  cfgOpts[totalRows] = "charset";
1,446✔
1735
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,446✔
1736
  totalRows++;
1,446✔
1737

1738
  cfgOpts[totalRows] = "monitor";
1,446✔
1739
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,446✔
1740
  totalRows++;
1,446✔
1741

1742
  cfgOpts[totalRows] = "monitorInterval";
1,446✔
1743
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,446✔
1744
  totalRows++;
1,446✔
1745

1746
  cfgOpts[totalRows] = "slowLogThreshold";
1,446✔
1747
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,446✔
1748
  totalRows++;
1,446✔
1749

1750
  cfgOpts[totalRows] = "slowLogMaxLen";
1,446✔
1751
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,446✔
1752
  totalRows++;
1,446✔
1753

1754
  char scopeStr[64] = {0};
1,446✔
1755
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,446✔
1756
  cfgOpts[totalRows] = "slowLogScope";
1,446✔
1757
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,446✔
1758
  totalRows++;
1,446✔
1759

1760
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,446✔
1761
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,446✔
1762

1763
  for (int32_t i = 0; i < totalRows; i++) {
14,388✔
1764
    cols = 0;
12,946✔
1765

1766
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
12,946✔
1767
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,946✔
1768
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
12,949!
1769

1770
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
12,963✔
1771
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,963✔
1772
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
12,959!
1773

1774
    numOfRows++;
12,942✔
1775
  }
1776

1777
_OVER:
1,442✔
1778
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,442!
1779
  pShow->numOfRows += numOfRows;
1,446✔
1780
  return numOfRows;
1,446✔
1781
}
1782

1783
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1784

1785
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7,278✔
1786
  SMnode    *pMnode = pReq->info.node;
7,278✔
1787
  SSdb      *pSdb = pMnode->pSdb;
7,278✔
1788
  int32_t    numOfRows = 0;
7,278✔
1789
  int32_t    cols = 0;
7,278✔
1790
  ESdbStatus objStatus = 0;
7,278✔
1791
  SDnodeObj *pDnode = NULL;
7,278✔
1792
  int64_t    curMs = taosGetTimestampMs();
7,279✔
1793
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1794
  int32_t    code = 0;
7,279✔
1795
  int32_t    lino = 0;
7,279✔
1796

1797
  while (numOfRows < rows) {
16,747!
1798
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
16,747✔
1799
    if (pShow->pIter == NULL) break;
16,753✔
1800
    bool online = mndIsDnodeOnline(pDnode, curMs);
9,465✔
1801

1802
    cols = 0;
9,457✔
1803

1804
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,457✔
1805
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
9,457!
1806

1807
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
9,448✔
1808

1809
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,448✔
1810
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
9,446!
1811

1812
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,447✔
1813
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
9,440✔
1814
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
9,467!
1815

1816
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,451✔
1817
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
9,441!
1818
                        &lino, _OVER);
1819

1820
    const char *status = "ready";
9,444✔
1821
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
9,444!
1822
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
9,444!
1823
    if (!online) {
9,444✔
1824
      if (objStatus == SDB_STATUS_CREATING)
417!
1825
        status = "creating*";
×
1826
      else if (objStatus == SDB_STATUS_DROPPING)
417!
1827
        status = "dropping*";
×
1828
      else
1829
        status = "offline";
417✔
1830
    }
1831

1832
    STR_TO_VARSTR(buf, status);
9,444✔
1833
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,444✔
1834
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
9,444!
1835

1836
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,454✔
1837
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
9,449!
1838
                        _OVER);
1839

1840
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,446✔
1841
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
9,445!
1842
                        _OVER);
1843

1844
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
9,448✔
1845
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
9,468✔
1846

1847
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,468✔
1848
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
9,466!
1849
    taosMemoryFreeClear(b);
9,459!
1850

1851
#ifdef TD_ENTERPRISE
1852
    STR_TO_VARSTR(buf, pDnode->machineId);
9,462✔
1853
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,462✔
1854
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
9,457!
1855
#endif
1856

1857
    numOfRows++;
9,460✔
1858
    sdbRelease(pSdb, pDnode);
9,460✔
1859
  }
1860

UNCOV
1861
_OVER:
×
1862
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
7,288!
1863

1864
  pShow->numOfRows += numOfRows;
7,288✔
1865
  return numOfRows;
7,288✔
1866
}
1867

1868
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1869
  SSdb *pSdb = pMnode->pSdb;
×
1870
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1871
}
×
1872

1873
// get int32_t value from 'SMCfgDnodeReq'
1874
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32_t *pOutValue) {
×
1875
  int32_t code = 0;
×
1876
  if (' ' != pMCfgReq->config[optLen] && 0 != pMCfgReq->config[optLen]) {
×
1877
    goto _err;
×
1878
  }
1879

1880
  if (' ' == pMCfgReq->config[optLen]) {
×
1881
    // 'key value'
1882
    if (strlen(pMCfgReq->value) != 0) goto _err;
×
1883
    *pOutValue = atoi(pMCfgReq->config + optLen + 1);
×
1884
  } else {
1885
    // 'key' 'value'
1886
    if (strlen(pMCfgReq->value) == 0) goto _err;
×
1887
    *pOutValue = atoi(pMCfgReq->value);
×
1888
  }
1889

1890
  TAOS_RETURN(code);
×
1891

1892
_err:
×
1893
  mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
×
1894
  code = TSDB_CODE_INVALID_CFG;
×
1895
  TAOS_RETURN(code);
×
1896
}
1897

1898
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
3,325✔
1899
  SDnodeObj *pObj = NULL;
3,325✔
1900
  void      *pIter = NULL;
3,325✔
1901
  SSdb      *pSdb = pMnode->pSdb;
3,325✔
1902
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
3,325✔
1903
  while (1) {
2,965✔
1904
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
6,290✔
1905
    if (pIter == NULL) break;
6,290✔
1906

1907
    char *fqdn = taosStrdup(pObj->fqdn);
2,965✔
1908
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
2,965!
1909
      mError("failed to fqdn into array, but continue at this time");
×
1910
    }
1911
    sdbRelease(pSdb, pObj);
2,965✔
1912
  }
1913
  return fqdns;
3,325✔
1914
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc