• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3529

14 Nov 2024 01:56PM UTC coverage: 60.888% (-0.02%) from 60.905%
#3529

push

travis-ci

web-flow
Merge pull request #28764 from taosdata/docs/TS-4937

doc(arch/last): new section for last/last_row cache

119990 of 252020 branches covered (47.61%)

Branch coverage included in aggregate %.

200800 of 274829 relevant lines covered (73.06%)

15624555.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.16
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndCluster.h"
21
#include "mndDb.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "taos_monitor.h"
31
#include "tjson.h"
32
#include "tmisce.h"
33
#include "tunit.h"
34

35
#define TSDB_DNODE_VER_NUMBER   2
36
#define TSDB_DNODE_RESERVE_SIZE 40
37

38
static const char *offlineReason[] = {
39
    "",
40
    "status msg timeout",
41
    "status not received",
42
    "version not match",
43
    "dnodeId not match",
44
    "clusterId not match",
45
    "statusInterval not match",
46
    "timezone not match",
47
    "locale not match",
48
    "charset not match",
49
    "ttlChangeOnWrite not match",
50
    "enableWhiteList not match",
51
    "encryptionKey not match",
52
    "monitor not match",
53
    "monitor switch not match",
54
    "monitor interval not match",
55
    "monitor slow log threshold not match",
56
    "monitor slow log sql max len not match",
57
    "monitor slow log scope not match",
58
    "unknown",
59
};
60

61
enum {
62
  DND_ACTIVE_CODE,
63
  DND_CONN_ACTIVE_CODE,
64
};
65

66
enum {
67
  DND_CREATE,
68
  DND_ADD,
69
  DND_DROP,
70
};
71

72
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
73
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
74
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
75
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
76
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
77
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
78
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
79
static int32_t  mndProcessShowVariablesReq(SRpcMsg *pReq);
80

81
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
82
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
91
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
92
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
93

94
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
96
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
98

99
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue);
100

101
#ifdef _GRANT
102
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
103
#else
104
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
105
#endif
106

107
int32_t mndInitDnode(SMnode *pMnode) {
1,982✔
108
  SSdbTable table = {
1,982✔
109
      .sdbType = SDB_DNODE,
110
      .keyType = SDB_KEY_INT32,
111
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
112
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
113
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
114
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
115
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
116
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
117
  };
118

119
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
1,982✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
1,982✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
1,982✔
122
  mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
1,982✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
1,982✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
1,982✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
1,982✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
1,982✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
1,982✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
1,982✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
1,982✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
1,982✔
131
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
1,982✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
1,982✔
133

134
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
1,982✔
135
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
1,982✔
136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
1,982✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
1,982✔
138

139
  return sdbSetTable(pMnode->pSdb, table);
1,982✔
140
}
141

142
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
143
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
144
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
145
void          mndCleanupDnode(SMnode *pMnode) {}
1,981✔
146

147
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
1,474✔
148
  int32_t  code = -1;
1,474✔
149
  SSdbRaw *pRaw = NULL;
1,474✔
150
  STrans  *pTrans = NULL;
1,474✔
151

152
  SDnodeObj dnodeObj = {0};
1,474✔
153
  dnodeObj.id = 1;
1,474✔
154
  dnodeObj.createdTime = taosGetTimestampMs();
1,474✔
155
  dnodeObj.updateTime = dnodeObj.createdTime;
1,474✔
156
  dnodeObj.port = tsServerPort;
1,474✔
157
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
1,474✔
158
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
1,474✔
159
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
1,474✔
160
  char *machineId = NULL;
1,474✔
161
  code = tGetMachineId(&machineId);
1,474✔
162
  if (machineId) {
1,474!
163
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
1,474✔
164
    taosMemoryFreeClear(machineId);
1,474!
165
  } else {
166
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
167
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
168
    goto _OVER;
×
169
#endif
170
  }
171

172
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
1,474✔
173
  if (pTrans == NULL) {
1,474!
174
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
175
    if (terrno != 0) code = terrno;
×
176
    goto _OVER;
×
177
  }
178
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
1,474!
179

180
  pRaw = mndDnodeActionEncode(&dnodeObj);
1,474✔
181
  if (pRaw == NULL) {
1,474!
182
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
183
    if (terrno != 0) code = terrno;
×
184
    goto _OVER;
×
185
  }
186
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
1,474!
187
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
1,474!
188
  pRaw = NULL;
1,474✔
189

190
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,474!
191
  code = 0;
1,474✔
192
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
1,474✔
193
                                   1);  // TODO: check the return value
194

195
_OVER:
1,474✔
196
  mndTransDrop(pTrans);
1,474✔
197
  sdbFreeRaw(pRaw);
1,474✔
198
  return code;
1,474✔
199
}
200

201
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
7,423✔
202
  int32_t code = 0;
7,423✔
203
  int32_t lino = 0;
7,423✔
204
  terrno = TSDB_CODE_OUT_OF_MEMORY;
7,423✔
205

206
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
7,423✔
207
  if (pRaw == NULL) goto _OVER;
7,423!
208

209
  int32_t dataPos = 0;
7,423✔
210
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
7,423!
211
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
7,423!
212
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
7,423!
213
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
7,423!
214
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
7,423!
215
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
7,423!
216
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
7,423!
217
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
7,423!
218
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
7,423!
219
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
7,423!
220

221
  terrno = 0;
7,423✔
222

223
_OVER:
7,423✔
224
  if (terrno != 0) {
7,423!
225
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
226
    sdbFreeRaw(pRaw);
×
227
    return NULL;
×
228
  }
229

230
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
7,423✔
231
  return pRaw;
7,423✔
232
}
233

234
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
4,557✔
235
  int32_t code = 0;
4,557✔
236
  int32_t lino = 0;
4,557✔
237
  terrno = TSDB_CODE_OUT_OF_MEMORY;
4,557✔
238
  SSdbRow   *pRow = NULL;
4,557✔
239
  SDnodeObj *pDnode = NULL;
4,557✔
240

241
  int8_t sver = 0;
4,557✔
242
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
4,557!
243
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
4,557!
244
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
245
    goto _OVER;
×
246
  }
247

248
  pRow = sdbAllocRow(sizeof(SDnodeObj));
4,557✔
249
  if (pRow == NULL) goto _OVER;
4,557!
250

251
  pDnode = sdbGetRowObj(pRow);
4,557✔
252
  if (pDnode == NULL) goto _OVER;
4,557!
253

254
  int32_t dataPos = 0;
4,557✔
255
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
4,557!
256
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
4,557!
257
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
4,557!
258
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
4,557!
259
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
4,557!
260
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
4,557!
261
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
4,557!
262
  if (sver > 1) {
4,557!
263
    int16_t keyLen = 0;
4,557✔
264
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
4,557!
265
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
4,557!
266
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
4,557!
267
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
4,557!
268
  }
269

270
  terrno = 0;
4,557✔
271
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
4,557!
272
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
273
  }
274

275
_OVER:
4,557✔
276
  if (terrno != 0) {
4,557!
277
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
278
    taosMemoryFreeClear(pRow);
×
279
    return NULL;
×
280
  }
281

282
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
4,557✔
283
  return pRow;
4,557✔
284
}
285

286
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
3,657✔
287
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
3,657✔
288
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
3,657✔
289

290
  char ep[TSDB_EP_LEN] = {0};
3,657✔
291
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
3,657✔
292
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
3,657✔
293
  return 0;
3,657✔
294
}
295

296
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
4,552✔
297
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
4,552✔
298
  return 0;
4,552✔
299
}
300

301
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
875✔
302
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
875!
303
  pOld->updateTime = pNew->updateTime;
875✔
304
#ifdef TD_ENTERPRISE
305
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
875✔
306
#endif
307
  return 0;
875✔
308
}
309

310
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
7,188,851✔
311
  SSdb      *pSdb = pMnode->pSdb;
7,188,851✔
312
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
7,188,851✔
313
  if (pDnode == NULL) {
7,188,856✔
314
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
615✔
315
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
25✔
316
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
590!
317
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
318
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
590!
319
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
590✔
320
    } else {
321
      terrno = TSDB_CODE_APP_ERROR;
×
322
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
323
    }
324
  }
325

326
  return pDnode;
7,188,848✔
327
}
328

329
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
7,191,250✔
330
  SSdb *pSdb = pMnode->pSdb;
7,191,250✔
331
  sdbRelease(pSdb, pDnode);
7,191,250✔
332
}
7,191,295✔
333

334
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
30,627✔
335
  SEpSet epSet = {0};
30,627✔
336
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
30,627✔
337
  return epSet;
30,627✔
338
}
339

340
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
341
  SEpSet     epSet = {0};
×
342
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
343
  if (!pDnode) return epSet;
×
344

345
  epSet = mndGetDnodeEpset(pDnode);
×
346

347
  mndReleaseDnode(pMnode, pDnode);
×
348
  return epSet;
×
349
}
350

351
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
2,879✔
352
  SSdb *pSdb = pMnode->pSdb;
2,879✔
353

354
  void *pIter = NULL;
2,879✔
355
  while (1) {
3,588✔
356
    SDnodeObj *pDnode = NULL;
6,467✔
357
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
6,467✔
358
    if (pIter == NULL) break;
6,467✔
359

360
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
5,523✔
361
      sdbCancelFetch(pSdb, pIter);
1,935✔
362
      return pDnode;
1,935✔
363
    }
364

365
    sdbRelease(pSdb, pDnode);
3,588✔
366
  }
367

368
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
944✔
369
  return NULL;
944✔
370
}
371

372
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
341✔
373
  SSdb *pSdb = pMnode->pSdb;
341✔
374

375
  void *pIter = NULL;
341✔
376
  while (1) {
341✔
377
    SDnodeObj *pDnode = NULL;
682✔
378
    ESdbStatus objStatus = 0;
682✔
379
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
682✔
380
    if (pIter == NULL) break;
682!
381

382
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
682✔
383
      sdbCancelFetch(pSdb, pIter);
341✔
384
      return pDnode;
341✔
385
    }
386

387
    sdbRelease(pSdb, pDnode);
341✔
388
  }
389

390
  return NULL;
×
391
}
392

393
int32_t mndGetDnodeSize(SMnode *pMnode) {
585,422✔
394
  SSdb *pSdb = pMnode->pSdb;
585,422✔
395
  return sdbGetSize(pSdb, SDB_DNODE);
585,422✔
396
}
397

398
int32_t mndGetDbSize(SMnode *pMnode) {
×
399
  SSdb *pSdb = pMnode->pSdb;
×
400
  return sdbGetSize(pSdb, SDB_DB);
×
401
}
402

403
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
544,831✔
404
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
544,831✔
405
  if (interval > 5000 * (int64_t)tsStatusInterval) {
544,831✔
406
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
6,480✔
407
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
72✔
408
    }
409
    return false;
6,480✔
410
  }
411
  return true;
538,351✔
412
}
413

414
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
4,025✔
415
  SSdb *pSdb = pMnode->pSdb;
4,025✔
416

417
  int32_t numOfEps = 0;
4,025✔
418
  void   *pIter = NULL;
4,025✔
419
  while (1) {
13,482✔
420
    SDnodeObj *pDnode = NULL;
17,507✔
421
    ESdbStatus objStatus = 0;
17,507✔
422
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
17,507✔
423
    if (pIter == NULL) break;
17,507✔
424

425
    SDnodeEp dnodeEp = {0};
13,482✔
426
    dnodeEp.id = pDnode->id;
13,482✔
427
    dnodeEp.ep.port = pDnode->port;
13,482✔
428
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
13,482✔
429
    sdbRelease(pSdb, pDnode);
13,482✔
430

431
    dnodeEp.isMnode = 0;
13,482✔
432
    if (mndIsMnode(pMnode, pDnode->id)) {
13,482✔
433
      dnodeEp.isMnode = 1;
5,581✔
434
    }
435
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
13,482!
436
      mError("failed to put ep into array, but continue at this call");
×
437
    }
438
  }
439
}
4,025✔
440

441
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
54,490✔
442
  SSdb   *pSdb = pMnode->pSdb;
54,490✔
443
  int32_t code = 0;
54,490✔
444

445
  int32_t numOfEps = 0;
54,490✔
446
  void   *pIter = NULL;
54,490✔
447
  while (1) {
239,909✔
448
    SDnodeObj *pDnode = NULL;
294,399✔
449
    ESdbStatus objStatus = 0;
294,399✔
450
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
294,399✔
451
    if (pIter == NULL) break;
294,397✔
452

453
    SDnodeInfo dInfo;
454
    dInfo.id = pDnode->id;
239,907✔
455
    dInfo.ep.port = pDnode->port;
239,907✔
456
    dInfo.offlineReason = pDnode->offlineReason;
239,907✔
457
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
239,907✔
458
    sdbRelease(pSdb, pDnode);
239,907✔
459
    if (mndIsMnode(pMnode, pDnode->id)) {
239,910✔
460
      dInfo.isMnode = 1;
68,459✔
461
    } else {
462
      dInfo.isMnode = 0;
171,451✔
463
    }
464

465
    if(taosArrayPush(pDnodeInfo, &dInfo) == NULL){
239,909!
466
      code = terrno;
×
467
      sdbCancelFetch(pSdb, pIter);
×
468
      break;
×
469
    }
470
  }
471
  TAOS_RETURN(code);
54,490✔
472
}
473

474
#define CHECK_MONITOR_PARA(para,err) \
475
if (pCfg->monitorParas.para != para) { \
476
  mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
477
  terrno = err; \
478
  return err;\
479
}
480

481
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
4,025✔
482
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
4,025!
483
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
4,025!
484
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
4,025!
485
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
4,025!
486
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
4,025!
487

488
  if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
4,025!
489
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
×
490
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
491
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
492
  }
493

494
  if (pCfg->statusInterval != tsStatusInterval) {
4,025!
495
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
496
           tsStatusInterval);
497
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
498
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
499
  }
500

501
  if ((0 != strcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
4,025!
502
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
503
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
504
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
505
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
506
  }
507

508
  if (0 != strcasecmp(pCfg->locale, tsLocale)) {
4,025!
509
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
510
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
511
    return DND_REASON_LOCALE_NOT_MATCH;
×
512
  }
513

514
  if (0 != strcasecmp(pCfg->charset, tsCharset)) {
4,025!
515
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
516
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
517
    return DND_REASON_CHARSET_NOT_MATCH;
×
518
  }
519

520
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
4,025!
521
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
522
           tsTtlChangeOnWrite);
523
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
524
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
525
  }
526
  int8_t enable = tsEnableWhiteList ? 1 : 0;
4,025✔
527
  if (pCfg->enableWhiteList != enable) {
4,025!
528
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
529
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
530
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
531
  }
532

533
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
4,025!
534
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
4,025!
535
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
536
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
537
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
538
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
539
  }
540

541
  return DND_REASON_ONLINE;
4,025✔
542
}
543

544
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
1,172,580✔
545
  bool stateChanged = false;
1,172,580✔
546
  bool roleChanged = pGid->syncState != pVload->syncState ||
3,514,269✔
547
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
2,331,321!
548
                     pGid->roleTimeMs != pVload->roleTimeMs;
1,158,741✔
549
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
1,172,580!
550
      pGid->startTimeMs != pVload->startTimeMs) {
1,158,073!
551
    mInfo(
14,507✔
552
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
553
        "canRead:%d, dnode:%d",
554
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
555
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
556
    pGid->syncState = pVload->syncState;
14,506✔
557
    pGid->syncTerm = pVload->syncTerm;
14,506✔
558
    pGid->syncRestore = pVload->syncRestore;
14,506✔
559
    pGid->syncCanRead = pVload->syncCanRead;
14,506✔
560
    pGid->startTimeMs = pVload->startTimeMs;
14,506✔
561
    pGid->roleTimeMs = pVload->roleTimeMs;
14,506✔
562
    stateChanged = true;
14,506✔
563
  }
564
  return stateChanged;
1,172,579✔
565
}
566

567
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
65,348✔
568
  bool stateChanged = false;
65,348✔
569
  bool roleChanged = pObj->syncState != pMload->syncState ||
194,098✔
570
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
128,689!
571
                     pObj->roleTimeMs != pMload->roleTimeMs;
63,341✔
572
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
65,348✔
573
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
2,036!
574
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
575
          pObj->syncTerm, pMload->syncTerm);
576
    pObj->syncState = pMload->syncState;
2,036✔
577
    pObj->syncTerm = pMload->syncTerm;
2,036✔
578
    pObj->syncRestore = pMload->syncRestore;
2,036✔
579
    pObj->roleTimeMs = pMload->roleTimeMs;
2,036✔
580
    stateChanged = true;
2,036✔
581
  }
582
  return stateChanged;
65,348✔
583
}
584

585
extern char* tsMonFwUri;
586
extern char* tsMonSlowLogUri;
587
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
10,509✔
588
  SMnode    *pMnode = pReq->info.node;
10,509✔
589
  SStatisReq statisReq = {0};
10,509✔
590
  int32_t    code = -1;
10,509✔
591

592
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
10,509!
593

594
  if (tsMonitorLogProtocol) {
10,509!
595
    mInfo("process statis req,\n %s", statisReq.pCont);
×
596
  }
597

598
  if (statisReq.type == MONITOR_TYPE_COUNTER){
10,509✔
599
    monSendContent(statisReq.pCont, tsMonFwUri);
8,598✔
600
  }else if(statisReq.type == MONITOR_TYPE_SLOW_LOG){
1,911!
601
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
1,911✔
602
  }
603

604
  tFreeSStatisReq(&statisReq);
10,509✔
605
  return 0;
10,509✔
606
}
607

608
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
59,155✔
609
  mTrace("process audit req:%p", pReq);
59,155✔
610
  if (tsEnableAudit && tsEnableAuditDelete) {
59,155!
611
    SMnode   *pMnode = pReq->info.node;
59,155✔
612
    SAuditReq auditReq = {0};
59,155✔
613

614
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
59,155!
615

616
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
59,154✔
617

618
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
59,154✔
619
                   auditReq.sqlLen);
620

621
    tFreeSAuditReq(&auditReq);
59,152✔
622
  }
623
  return 0;
59,154✔
624
}
625

626
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
462✔
627
  int32_t       code = 0, lino = 0;
462✔
628
  SDnodeInfoReq infoReq = {0};
462✔
629
  int32_t       contLen = 0;
462✔
630
  void         *pReq = NULL;
462✔
631

632
  infoReq.dnodeId = pDnode->id;
462✔
633
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
462✔
634

635
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
462!
636
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
637
  }
638
  pReq = rpcMallocCont(contLen);
462✔
639
  if (pReq == NULL) {
462!
640
    TAOS_RETURN(terrno);
×
641
  }
642

643
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
462!
644
    code = contLen;
×
645
    goto _exit;
×
646
  }
647

648
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
462✔
649
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
462!
650
_exit:
462✔
651
  if (code < 0) {
462!
652
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
653
  }
654
  TAOS_RETURN(code);
462✔
655
}
656

657
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
462✔
658
  int32_t       code = 0, lino = 0;
462✔
659
  SMnode       *pMnode = pReq->info.node;
462✔
660
  SDnodeInfoReq infoReq = {0};
462✔
661
  SDnodeObj    *pDnode = NULL;
462✔
662
  STrans       *pTrans = NULL;
462✔
663
  SSdbRaw      *pCommitRaw = NULL;
462✔
664

665
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
462!
666

667
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
462✔
668
  if (pDnode == NULL) {
462!
669
    TAOS_CHECK_EXIT(terrno);
×
670
  }
671

672
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
462✔
673
  if (pTrans == NULL) {
462!
674
    TAOS_CHECK_EXIT(terrno);
×
675
  }
676

677
  pDnode->updateTime = taosGetTimestampMs();
462✔
678

679
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
462!
680
    TAOS_CHECK_EXIT(terrno);
×
681
  }
682
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
462!
683
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
684
    TAOS_CHECK_EXIT(code);
×
685
  }
686
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
462!
687
  pCommitRaw = NULL;
462✔
688

689
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
462!
690
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
691
    TAOS_CHECK_EXIT(code);
×
692
  }
693

694
_exit:
462✔
695
  mndReleaseDnode(pMnode, pDnode);
462✔
696
  if (code != 0) {
462!
697
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
698
  }
699
  mndTransDrop(pTrans);
462✔
700
  sdbFreeRaw(pCommitRaw);
462✔
701
  TAOS_RETURN(code);
462✔
702
}
703

704
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
101,132✔
705
  SMnode    *pMnode = pReq->info.node;
101,132✔
706
  SStatusReq statusReq = {0};
101,132✔
707
  SDnodeObj *pDnode = NULL;
101,132✔
708
  int32_t    code = -1;
101,132✔
709

710
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
101,132!
711

712
  int64_t clusterid = mndGetClusterId(pMnode);
101,133✔
713
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
101,134!
714
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
715
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
716
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
717
    goto _OVER;
×
718
  }
719

720
  if (statusReq.dnodeId == 0) {
101,134✔
721
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
2,058✔
722
    if (pDnode == NULL) {
2,058✔
723
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
126!
724
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
126✔
725
      if (terrno != 0) code = terrno;
126!
726
      goto _OVER;
126✔
727
    }
728
  } else {
729
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
99,076✔
730
    if (pDnode == NULL) {
99,075✔
731
      int32_t err = terrno;
356✔
732
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
356✔
733
      if (pDnode != NULL) {
356✔
734
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
3✔
735
        terrno = err;
3✔
736
        goto _OVER;
3✔
737
      }
738

739
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
353!
740
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
353✔
741
        terrno = err;
12✔
742
        goto _OVER;
12✔
743
      } else {
744
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
341✔
745
        if (pDnode == NULL) goto _OVER;
341!
746
      }
747
    }
748
  }
749

750
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
100,992✔
751

752
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
100,992✔
753
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
100,992✔
754
  int64_t curMs = taosGetTimestampMs();
100,993✔
755
  bool    online = mndIsDnodeOnline(pDnode, curMs);
100,993✔
756
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
100,990✔
757
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
100,990✔
758
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
100,990✔
759
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
100,990✔
760
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
100,990✔
761
  bool    analVerChanged = (analVer != statusReq.analVer);
100,990✔
762
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
98,520!
763
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
199,510!
764
  const STraceId *trace = &pReq->info.traceId;
100,990✔
765
  mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
100,990!
766
          pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
767

768
  if (reboot) {
100,990✔
769
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
2,539✔
770
  }
771

772
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
1,275,598✔
773
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
1,174,608✔
774

775
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
1,174,608✔
776
    if (pVgroup != NULL) {
1,174,608✔
777
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,172,604!
778
        pVgroup->cacheUsage = pVload->cacheUsage;
1,125,367✔
779
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
1,125,367✔
780
        pVgroup->numOfTables = pVload->numOfTables;
1,125,367✔
781
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
1,125,367✔
782
        pVgroup->totalStorage = pVload->totalStorage;
1,125,367✔
783
        pVgroup->compStorage = pVload->compStorage;
1,125,367✔
784
        pVgroup->pointsWritten = pVload->pointsWritten;
1,125,367✔
785
      }
786
      bool stateChanged = false;
1,172,604✔
787
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
1,236,595✔
788
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
1,236,570✔
789
        if (pGid->dnodeId == statusReq.dnodeId) {
1,236,570✔
790
          if (pVload->startTimeMs == 0) {
1,172,579!
791
            pVload->startTimeMs = statusReq.rebootTime;
×
792
          }
793
          if (pVload->roleTimeMs == 0) {
1,172,579!
794
            pVload->roleTimeMs = statusReq.rebootTime;
×
795
          }
796
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
1,172,579✔
797
          break;
1,172,579✔
798
        }
799
      }
800
      if (stateChanged) {
1,172,604✔
801
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
14,506✔
802
        if (pDb != NULL && pDb->stateTs != curMs) {
14,506✔
803
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
8,857!
804
                pDb->stateTs, curMs);
805
          pDb->stateTs = curMs;
8,857✔
806
        }
807
        mndReleaseDb(pMnode, pDb);
14,506✔
808
      }
809
    }
810

811
    mndReleaseVgroup(pMnode, pVgroup);
1,174,608✔
812
  }
813

814
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
100,989✔
815
  if (pObj != NULL) {
100,993✔
816
    if (statusReq.mload.roleTimeMs == 0) {
65,348✔
817
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
6✔
818
    }
819
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
65,348✔
820
    mndReleaseMnode(pMnode, pObj);
65,348✔
821
  }
822

823
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
100,993✔
824
  if (pQnode != NULL) {
100,993✔
825
    pQnode->load = statusReq.qload;
39,696✔
826
    mndReleaseQnode(pMnode, pQnode);
39,696✔
827
  }
828

829
  if (needCheck) {
100,993✔
830
    if (statusReq.sver != tsVersion) {
4,025!
831
      if (pDnode != NULL) {
×
832
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
833
      }
834
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
835
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
836
      goto _OVER;
×
837
    }
838

839
    if (statusReq.dnodeId == 0) {
4,025✔
840
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
1,932!
841
    } else {
842
      if (statusReq.clusterId != pMnode->clusterId) {
2,093!
843
        if (pDnode != NULL) {
×
844
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
845
        }
846
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
847
               pMnode->clusterId);
848
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
849
        goto _OVER;
×
850
      }
851
    }
852

853
    // Verify whether the cluster parameters are consistent when status change from offline to ready
854
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
4,025✔
855
    if (pDnode->offlineReason != 0) {
4,025!
856
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
857
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
858
      goto _OVER;
×
859
    }
860

861
    if (!online) {
4,025✔
862
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
2,470!
863
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
864
    } else {
865
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
1,555!
866
            statusReq.dnodeVer, dnodeVer, reboot);
867
    }
868

869
    pDnode->rebootTime = statusReq.rebootTime;
4,025✔
870
    pDnode->numOfCores = statusReq.numOfCores;
4,025✔
871
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
4,025✔
872
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
4,025✔
873
    pDnode->memAvail = statusReq.memAvail;
4,025✔
874
    pDnode->memTotal = statusReq.memTotal;
4,025✔
875
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
4,025✔
876
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
4,025✔
877
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
4,025✔
878
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
462✔
879
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
462!
880
        goto _OVER;
×
881
      }
882
    }
883

884
    SStatusRsp statusRsp = {0};
4,025✔
885
    statusRsp.statusSeq++;
4,025✔
886
    statusRsp.analVer = analVer;
4,025✔
887
    statusRsp.dnodeVer = dnodeVer;
4,025✔
888
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
4,025✔
889
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
4,025✔
890
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
4,025✔
891
    if (statusRsp.pDnodeEps == NULL) {
4,025!
892
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
893
      goto _OVER;
×
894
    }
895

896
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
4,025✔
897
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
4,025✔
898

899
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
4,025✔
900
    void   *pHead = rpcMallocCont(contLen);
4,025✔
901
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
4,025✔
902
    taosArrayDestroy(statusRsp.pDnodeEps);
4,025✔
903
    if (contLen < 0) {
4,025!
904
      code = contLen;
×
905
      goto _OVER;
×
906
    }
907

908
    pReq->info.rspLen = contLen;
4,025✔
909
    pReq->info.rsp = pHead;
4,025✔
910
  }
911

912
  pDnode->accessTimes++;
100,993✔
913
  pDnode->lastAccessTime = curMs;
100,993✔
914
  code = 0;
100,993✔
915

916
_OVER:
101,134✔
917
  mndReleaseDnode(pMnode, pDnode);
101,134✔
918
  taosArrayDestroy(statusReq.pVloads);
101,134✔
919
  return mndUpdClusterInfo(pReq);
101,134✔
920
}
921

922
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
923
  SMnode    *pMnode = pReq->info.node;
×
924
  SNotifyReq notifyReq = {0};
×
925
  int32_t    code = 0;
×
926

927
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
928
    terrno = code;
×
929
    goto _OVER;
×
930
  }
931

932
  int64_t clusterid = mndGetClusterId(pMnode);
×
933
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
934
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
935
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
936
          notifyReq.clusterId, clusterid, tstrerror(code));
937
    goto _OVER;
×
938
  }
939

940
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
941
  for (int32_t v = 0; v < nVgroup; ++v) {
×
942
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
943

944
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
945
    if (pVgroup != NULL) {
×
946
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
947
      mndReleaseVgroup(pMnode, pVgroup);
×
948
    }
949
  }
950
  code = mndUpdClusterInfo(pReq);
×
951
_OVER:
×
952
  tFreeSNotifyReq(&notifyReq);
×
953
  return code;
×
954
}
955

956
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
464✔
957
  int32_t  code = -1;
464✔
958
  SSdbRaw *pRaw = NULL;
464✔
959
  STrans  *pTrans = NULL;
464✔
960

961
  SDnodeObj dnodeObj = {0};
464✔
962
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
464✔
963
  dnodeObj.createdTime = taosGetTimestampMs();
464✔
964
  dnodeObj.updateTime = dnodeObj.createdTime;
464✔
965
  dnodeObj.port = pCreate->port;
464✔
966
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
464✔
967
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
464✔
968

969
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
464✔
970
  if (pTrans == NULL) {
464!
971
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
972
    if (terrno != 0) code = terrno;
×
973
    goto _OVER;
×
974
  }
975
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
464!
976
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
464!
977

978
  pRaw = mndDnodeActionEncode(&dnodeObj);
464✔
979
  if (pRaw == NULL) {
464!
980
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
981
    if (terrno != 0) code = terrno;
×
982
    goto _OVER;
×
983
  }
984
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
464!
985
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
464!
986
  pRaw = NULL;
464✔
987

988
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
464!
989
  code = 0;
464✔
990

991
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
464✔
992
                                   1);  // TODO: check the return value
993
_OVER:
464✔
994
  mndTransDrop(pTrans);
464✔
995
  sdbFreeRaw(pRaw);
464✔
996
  return code;
464✔
997
}
998

999
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
95✔
1000
  SMnode       *pMnode = pReq->info.node;
95✔
1001
  SSdb         *pSdb = pMnode->pSdb;
95✔
1002
  SDnodeObj    *pObj = NULL;
95✔
1003
  void         *pIter = NULL;
95✔
1004
  SDnodeListRsp rsp = {0};
95✔
1005
  int32_t       code = -1;
95✔
1006

1007
  rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
95✔
1008
  if (NULL == rsp.dnodeList) {
95!
1009
    mError("failed to alloc epSet while process dnode list req");
×
1010
    code = terrno;
×
1011
    goto _OVER;
×
1012
  }
1013

1014
  while (1) {
114✔
1015
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
209✔
1016
    if (pIter == NULL) break;
209✔
1017

1018
    SEpSet epSet = {0};
114✔
1019
    epSet.numOfEps = 1;
114✔
1020
    tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
114✔
1021
    epSet.eps[0].port = pObj->port;
114✔
1022

1023
    if (taosArrayPush(rsp.dnodeList, &epSet) == NULL) {
228!
1024
      if (terrno != 0) code = terrno;
×
1025
      sdbRelease(pSdb, pObj);
×
1026
      sdbCancelFetch(pSdb, pIter);
×
1027
      goto _OVER;
×
1028
    }
1029

1030
    sdbRelease(pSdb, pObj);
114✔
1031
  }
1032

1033
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
95✔
1034
  void   *pRsp = rpcMallocCont(rspLen);
95✔
1035
  if (pRsp == NULL) {
95!
1036
    code = terrno;
×
1037
    goto _OVER;
×
1038
  }
1039

1040
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
95!
1041
    code = rspLen;
×
1042
    goto _OVER;
×
1043
  }
1044

1045
  pReq->info.rspLen = rspLen;
95✔
1046
  pReq->info.rsp = pRsp;
95✔
1047
  code = 0;
95✔
1048

1049
_OVER:
95✔
1050

1051
  if (code != 0) {
95!
1052
    mError("failed to get dnode list since %s", tstrerror(code));
×
1053
  }
1054

1055
  tFreeSDnodeListRsp(&rsp);
95✔
1056

1057
  TAOS_RETURN(code);
95✔
1058
}
1059

1060
static void getSlowLogScopeString(int32_t scope, char* result){
18✔
1061
  if(scope == SLOW_LOG_TYPE_NULL) {
18!
1062
    (void)strcat(result, "NONE");
×
1063
    return;
×
1064
  }
1065
  while(scope > 0){
36✔
1066
    if(scope & SLOW_LOG_TYPE_QUERY) {
18!
1067
      (void)strcat(result, "QUERY");
18✔
1068
      scope &= ~SLOW_LOG_TYPE_QUERY;
18✔
1069
    } else if(scope & SLOW_LOG_TYPE_INSERT) {
×
1070
      (void)strcat(result, "INSERT");
×
1071
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1072
    } else if(scope & SLOW_LOG_TYPE_OTHERS) {
×
1073
      (void)strcat(result, "OTHERS");
×
1074
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1075
    } else{
1076
      (void)printf("invalid slow log scope:%d", scope);
×
1077
      return;
×
1078
    }
1079

1080
    if(scope > 0) {
18!
1081
      (void)strcat(result, "|");
×
1082
    }
1083
  }
1084
}
1085

1086
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
11✔
1087
  SShowVariablesRsp rsp = {0};
11✔
1088
  int32_t           code = -1;
11✔
1089

1090
  if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_SHOW_VARIABLES) != 0) {
11!
1091
    goto _OVER;
×
1092
  }
1093

1094
  rsp.variables = taosArrayInit(16, sizeof(SVariablesInfo));
11✔
1095
  if (NULL == rsp.variables) {
11!
1096
    mError("failed to alloc SVariablesInfo array while process show variables req");
×
1097
    code = terrno;
×
1098
    goto _OVER;
×
1099
  }
1100

1101
  SVariablesInfo info = {0};
11✔
1102

1103
  (void)strcpy(info.name, "statusInterval");
11✔
1104
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
11✔
1105
  (void)strcpy(info.scope, "server");
11✔
1106
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1107
    code = terrno;
×
1108
    goto _OVER;
×
1109
  }
1110

1111
  (void)strcpy(info.name, "timezone");
11✔
1112
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
11✔
1113
  (void)strcpy(info.scope, "both");
11✔
1114
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1115
    code = terrno;
×
1116
    goto _OVER;
×
1117
  }
1118

1119
  (void)strcpy(info.name, "locale");
11✔
1120
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
11✔
1121
  (void)strcpy(info.scope, "both");
11✔
1122
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1123
    code = terrno;
×
1124
    goto _OVER;
×
1125
  }
1126

1127
  (void)strcpy(info.name, "charset");
11✔
1128
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
11✔
1129
  (void)strcpy(info.scope, "both");
11✔
1130
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1131
    code = terrno;
×
1132
    goto _OVER;
×
1133
  }
1134

1135
  (void)strcpy(info.name, "monitor");
11✔
1136
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
11✔
1137
  (void)strcpy(info.scope, "server");
11✔
1138
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1139
    code = terrno;
×
1140
    goto _OVER;
×
1141
  }
1142

1143
  (void)strcpy(info.name, "monitorInterval");
11✔
1144
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
11✔
1145
  (void)strcpy(info.scope, "server");
11✔
1146
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1147
    code = terrno;
×
1148
    goto _OVER;
×
1149
  }
1150

1151
  (void)strcpy(info.name, "slowLogThreshold");
11✔
1152
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
11✔
1153
  (void)strcpy(info.scope, "server");
11✔
1154
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1155
    code = terrno;
×
1156
    goto _OVER;
×
1157
  }
1158

1159
  (void)strcpy(info.name, "slowLogMaxLen");
11✔
1160
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
11✔
1161
  (void)strcpy(info.scope, "server");
11✔
1162
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1163
    code = terrno;
×
1164
    goto _OVER;
×
1165
  }
1166

1167
  char scopeStr[64] = {0};
11✔
1168
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
11✔
1169
  (void)strcpy(info.name, "slowLogScope");
11✔
1170
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
11✔
1171
  (void)strcpy(info.scope, "server");
11✔
1172
  if (taosArrayPush(rsp.variables, &info) == NULL) {
22!
1173
    code = terrno;
×
1174
    goto _OVER;
×
1175
  }
1176

1177
  int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp);
11✔
1178
  void   *pRsp = rpcMallocCont(rspLen);
11✔
1179
  if (pRsp == NULL) {
11!
1180
    code = terrno;
×
1181
    goto _OVER;
×
1182
  }
1183

1184
  if ((rspLen = tSerializeSShowVariablesRsp(pRsp, rspLen, &rsp)) <= 0) {
11!
1185
    code = rspLen;
×
1186
    goto _OVER;
×
1187
  }
1188

1189
  pReq->info.rspLen = rspLen;
11✔
1190
  pReq->info.rsp = pRsp;
11✔
1191
  code = 0;
11✔
1192

1193
_OVER:
11✔
1194

1195
  if (code != 0) {
11!
1196
    mError("failed to get show variables info since %s", tstrerror(code));
×
1197
  }
1198

1199
  tFreeSShowVariablesRsp(&rsp);
11✔
1200
  TAOS_RETURN(code);
11✔
1201
}
1202

1203
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
465✔
1204
  SMnode         *pMnode = pReq->info.node;
465✔
1205
  int32_t         code = -1;
465✔
1206
  SDnodeObj      *pDnode = NULL;
465✔
1207
  SCreateDnodeReq createReq = {0};
465✔
1208

1209
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
465!
1210
    goto _OVER;
×
1211
  }
1212

1213
  TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
465!
1214

1215
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
465!
1216
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
465✔
1217

1218
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
464!
1219
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1220
    goto _OVER;
×
1221
  }
1222

1223
  char ep[TSDB_EP_LEN];
1224
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
464✔
1225
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
464✔
1226
  if (pDnode != NULL) {
464!
1227
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1228
    goto _OVER;
×
1229
  }
1230

1231
  code = mndCreateDnode(pMnode, pReq, &createReq);
464✔
1232
  if (code == 0) {
464!
1233
    code = TSDB_CODE_ACTION_IN_PROGRESS;
464✔
1234
    tsGrantHBInterval = 5;
464✔
1235
  }
1236

1237
  char obj[200] = {0};
464✔
1238
  (void)sprintf(obj, "%s:%d", createReq.fqdn, createReq.port);
464✔
1239

1240
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
464✔
1241

1242
_OVER:
465✔
1243
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
465!
1244
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
1!
1245
  }
1246

1247
  mndReleaseDnode(pMnode, pDnode);
465✔
1248
  tFreeSCreateDnodeReq(&createReq);
465✔
1249
  TAOS_RETURN(code);
465✔
1250
}
1251

1252
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1253

1254
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
4✔
1255

1256
#ifndef TD_ENTERPRISE
1257
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1258
#endif
1259

1260
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
22✔
1261
                            SSnodeObj *pSObj, int32_t numOfVnodes, bool force, bool unsafe) {
1262
  int32_t  code = -1;
22✔
1263
  SSdbRaw *pRaw = NULL;
22✔
1264
  STrans  *pTrans = NULL;
22✔
1265

1266
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
22✔
1267
  if (pTrans == NULL) {
22!
1268
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1269
    if (terrno != 0) code = terrno;
×
1270
    goto _OVER;
×
1271
  }
1272
  mndTransSetSerial(pTrans);
22✔
1273
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
22!
1274
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
22!
1275

1276
  pRaw = mndDnodeActionEncode(pDnode);
22✔
1277
  if (pRaw == NULL) {
22!
1278
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1279
    if (terrno != 0) code = terrno;
×
1280
    goto _OVER;
×
1281
  }
1282
  TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRaw), NULL, _OVER);
22!
1283
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), NULL, _OVER);
22!
1284
  pRaw = NULL;
22✔
1285

1286
  pRaw = mndDnodeActionEncode(pDnode);
22✔
1287
  if (pRaw == NULL) {
22!
1288
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1289
    if (terrno != 0) code = terrno;
×
1290
    goto _OVER;
×
1291
  }
1292
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
22!
1293
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), NULL, _OVER);
22!
1294
  pRaw = NULL;
22✔
1295

1296
  if (pMObj != NULL) {
22✔
1297
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
4!
1298
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), NULL, _OVER);
4!
1299
  }
1300

1301
  if (pQObj != NULL) {
22✔
1302
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1303
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), NULL, _OVER);
3!
1304
  }
1305

1306
  if (pSObj != NULL) {
22✔
1307
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1308
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), NULL, _OVER);
3!
1309
  }
1310

1311
  if (numOfVnodes > 0) {
22✔
1312
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
16!
1313
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
16✔
1314
  }
1315

1316
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
20!
1317

1318
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP,
20✔
1319
                                   1);  // TODO: check the return value
1320
  code = 0;
20✔
1321

1322
_OVER:
22✔
1323
  mndTransDrop(pTrans);
22✔
1324
  sdbFreeRaw(pRaw);
22✔
1325
  TAOS_RETURN(code);
22✔
1326
}
1327

1328
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
24✔
1329
  bool       isEmpty = false;
24✔
1330
  SMnodeObj *pMObj = NULL;
24✔
1331
  SQnodeObj *pQObj = NULL;
24✔
1332
  SSnodeObj *pSObj = NULL;
24✔
1333

1334
  pQObj = mndAcquireQnode(pMnode, dnodeId);
24✔
1335
  if (pQObj) goto _OVER;
24✔
1336

1337
  pSObj = mndAcquireSnode(pMnode, dnodeId);
20✔
1338
  if (pSObj) goto _OVER;
20!
1339

1340
  pMObj = mndAcquireMnode(pMnode, dnodeId);
20✔
1341
  if (pMObj) goto _OVER;
20✔
1342

1343
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
18✔
1344
  if (numOfVnodes > 0) goto _OVER;
18✔
1345

1346
  isEmpty = true;
3✔
1347
_OVER:
24✔
1348
  mndReleaseMnode(pMnode, pMObj);
24✔
1349
  mndReleaseQnode(pMnode, pQObj);
24✔
1350
  mndReleaseSnode(pMnode, pSObj);
24✔
1351
  return isEmpty;
24✔
1352
}
1353

1354
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
30✔
1355
  SMnode       *pMnode = pReq->info.node;
30✔
1356
  int32_t       code = -1;
30✔
1357
  SDnodeObj    *pDnode = NULL;
30✔
1358
  SMnodeObj    *pMObj = NULL;
30✔
1359
  SQnodeObj    *pQObj = NULL;
30✔
1360
  SSnodeObj    *pSObj = NULL;
30✔
1361
  SDropDnodeReq dropReq = {0};
30✔
1362

1363
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
30!
1364

1365
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
30!
1366
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1367
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
30✔
1368

1369
  bool force = dropReq.force;
29✔
1370
  if (dropReq.unsafe) {
29✔
1371
    force = true;
1✔
1372
  }
1373

1374
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
29✔
1375
  if (pDnode == NULL) {
29✔
1376
    int32_t err = terrno;
1✔
1377
    char    ep[TSDB_EP_LEN + 1] = {0};
1✔
1378
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
1✔
1379
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
1✔
1380
    if (pDnode == NULL) {
1!
1381
      code = err;
1✔
1382
      goto _OVER;
1✔
1383
    }
1384
  }
1385

1386
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
28✔
1387
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
28✔
1388
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
28✔
1389
  if (pMObj != NULL) {
28✔
1390
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
9✔
1391
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
2✔
1392
      goto _OVER;
2✔
1393
    }
1394
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
7✔
1395
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
2✔
1396
      goto _OVER;
2✔
1397
    }
1398
  }
1399

1400
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
24✔
1401
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
24✔
1402

1403
  if (isonline && force) {
24!
1404
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1405
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1406
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1407
    goto _OVER;
×
1408
  }
1409

1410
  bool isEmpty = mndIsEmptyDnode(pMnode, pDnode->id);
24✔
1411
  if (!isonline && !force && !isEmpty) {
24!
1412
    code = TSDB_CODE_DNODE_OFFLINE;
2✔
1413
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
2!
1414
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1415
    goto _OVER;
2✔
1416
  }
1417

1418
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
22✔
1419
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
22✔
1420

1421
  char obj1[30] = {0};
22✔
1422
  (void)sprintf(obj1, "%d", dropReq.dnodeId);
22✔
1423

1424
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
22✔
1425

1426
_OVER:
30✔
1427
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
30!
1428
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
10!
1429
  }
1430

1431
  mndReleaseDnode(pMnode, pDnode);
30✔
1432
  mndReleaseMnode(pMnode, pMObj);
30✔
1433
  mndReleaseQnode(pMnode, pQObj);
30✔
1434
  mndReleaseSnode(pMnode, pSObj);
30✔
1435
  tFreeSDropDnodeReq(&dropReq);
30✔
1436
  TAOS_RETURN(code);
30✔
1437
}
1438

1439
static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
1,076✔
1440
  int32_t code = 0;
1,076✔
1441
  char *p = pMCfgReq->config;
1,076✔
1442
  while (*p) {
14,961✔
1443
    if (*p == ' ') {
14,006✔
1444
      break;
121✔
1445
    }
1446
    p++;
13,885✔
1447
  }
1448

1449
  size_t optLen = p - pMCfgReq->config;
1,076✔
1450
  (void)strncpy(pDCfgReq->config, pMCfgReq->config, optLen);
1,076✔
1451
  pDCfgReq->config[optLen] = 0;
1,076✔
1452

1453
  if (' ' == pMCfgReq->config[optLen]) {
1,076✔
1454
    // 'key value'
1455
    if (strlen(pMCfgReq->value) != 0) goto _err;
121!
1456
    (void)strcpy(pDCfgReq->value, p + 1);
121✔
1457
  } else {
1458
    // 'key' 'value'
1459
    if (strlen(pMCfgReq->value) == 0) goto _err;
955✔
1460
    (void)strcpy(pDCfgReq->value, pMCfgReq->value);
951✔
1461
  }
1462

1463
  TAOS_RETURN(code);
1,072✔
1464

1465
_err:
4✔
1466
  mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
4!
1467
  code = TSDB_CODE_INVALID_CFG;
4✔
1468
  TAOS_RETURN(code);
4✔
1469
}
1470

1471
static int32_t mndSendCfgDnodeReq(SMnode *pMnode, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
998✔
1472
  int32_t code = -1;
998✔
1473
  SSdb   *pSdb = pMnode->pSdb;
998✔
1474
  void   *pIter = NULL;
998✔
1475
  while (1) {
1,002✔
1476
    SDnodeObj *pDnode = NULL;
2,000✔
1477
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
2,000✔
1478
    if (pIter == NULL) break;
2,000✔
1479

1480
    if (pDnode->id == dnodeId || dnodeId == -1 || dnodeId == 0) {
1,002!
1481
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
994✔
1482
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
994✔
1483
      void   *pBuf = rpcMallocCont(bufLen);
994✔
1484

1485
      if (pBuf != NULL) {
994!
1486
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
994!
1487
          code = bufLen;
×
1488
          return code;
×
1489
        }
1490
        mInfo("dnode:%d, send config req to dnode, config:%s value:%s", dnodeId, pDcfgReq->config, pDcfgReq->value);
994!
1491
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
994✔
1492
        code = tmsgSendReq(&epSet, &rpcMsg);
994✔
1493
      }
1494
    }
1495

1496
    sdbRelease(pSdb, pDnode);
1,002✔
1497
  }
1498

1499
  if (code == -1) {
998✔
1500
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
4✔
1501
  }
1502
  TAOS_RETURN(code);
998✔
1503
}
1504

1505
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
1,085✔
1506
  int32_t       code = 0;
1,085✔
1507
  SMnode       *pMnode = pReq->info.node;
1,085✔
1508
  SMCfgDnodeReq cfgReq = {0};
1,085✔
1509
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1,085!
1510
  int8_t updateIpWhiteList = 0;
1,085✔
1511
  mInfo("dnode:%d, start to config, option:%s, value:%s", cfgReq.dnodeId, cfgReq.config, cfgReq.value);
1,085!
1512
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1,085✔
1513
    tFreeSMCfgDnodeReq(&cfgReq);
7✔
1514
    TAOS_RETURN(code);
7✔
1515
  }
1516

1517
  SDCfgDnodeReq dcfgReq = {0};
1,078✔
1518
  if (strcasecmp(cfgReq.config, "resetlog") == 0) {
1,078✔
1519
    (void)strcpy(dcfgReq.config, "resetlog");
2✔
1520
#ifdef TD_ENTERPRISE
1521
  } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
1,076!
1522
    int32_t optLen = strlen("s3blocksize");
×
1523
    int32_t flag = -1;
×
1524
    int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
×
1525
    if (code < 0) return code;
×
1526

1527
    if (flag > 1024 * 1024 || (flag > -1 && flag < 1024) || flag < -1) {
×
1528
      mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [1024, 1024 * 1024]",
×
1529
             cfgReq.dnodeId, flag);
1530
      code = TSDB_CODE_INVALID_CFG;
×
1531
      tFreeSMCfgDnodeReq(&cfgReq);
×
1532
      TAOS_RETURN(code);
×
1533
    }
1534

1535
    strcpy(dcfgReq.config, "s3blocksize");
×
1536
    snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
×
1537
#endif
1538
  } else {
1539
    TAOS_CHECK_GOTO (mndMCfg2DCfg(&cfgReq, &dcfgReq), NULL, _err_out);
1,076✔
1540
    if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) {
1,071!
1541
      mError("dnode:%d, failed to config since config is too long", cfgReq.dnodeId);
×
1542
      code = TSDB_CODE_INVALID_CFG;
×
1543
      goto _err_out;
×
1544
    }
1545
    if (strncasecmp(dcfgReq.config, "enableWhiteList", strlen("enableWhiteList")) == 0) {
1,071✔
1546
      updateIpWhiteList = 1;
4✔
1547
    }
1548

1549
    TAOS_CHECK_GOTO(cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true), NULL, _err_out);
1,071✔
1550
  }
1551

1552
  {  // audit
1553
    char obj[50] = {0};
998✔
1554
    (void)sprintf(obj, "%d", cfgReq.dnodeId);
998✔
1555

1556
    auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen);
998✔
1557
  }
1558

1559
  tFreeSMCfgDnodeReq(&cfgReq);
998✔
1560

1561
  code = mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
998✔
1562

1563
  // dont care suss or succ;
1564
  if (updateIpWhiteList) mndRefreshUserIpWhiteList(pMnode);
998✔
1565
  TAOS_RETURN(code);
998✔
1566

1567
_err_out:
80✔
1568
  tFreeSMCfgDnodeReq(&cfgReq);
80✔
1569
  TAOS_RETURN(code);
80✔
1570
}
1571

1572
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
994✔
1573
  mInfo("config rsp from dnode");
994!
1574
  return 0;
994✔
1575
}
1576

1577
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1✔
1578
  int32_t code = 0;
1✔
1579
  SMnode *pMnode = pReq->info.node;
1✔
1580
  SSdb   *pSdb = pMnode->pSdb;
1✔
1581
  void   *pIter = NULL;
1✔
1582
  int8_t  encrypting = 0;
1✔
1583

1584
  const STraceId *trace = &pReq->info.traceId;
1✔
1585

1586
  int32_t klen = strlen(pDcfgReq->value);
1✔
1587
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1!
1588
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1589
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1590
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1591
    goto _exit;
×
1592
  }
1593

1594
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1!
1595
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1596
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1597
    goto _exit;
×
1598
  }
1599

1600
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1!
1601
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1602
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1603
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1604
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1605
    goto _exit;
×
1606
  }
1607

1608
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1✔
1609
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1✔
1610
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1✔
1611

1612
  while (1) {
1✔
1613
    SDnodeObj *pDnode = NULL;
2✔
1614
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
2✔
1615
    if (pIter == NULL) break;
2✔
1616
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
1!
1617
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1618
             offlineReason[pDnode->offlineReason]);
1619
      sdbRelease(pSdb, pDnode);
×
1620
      continue;
×
1621
    }
1622

1623
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
1!
1624
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
1✔
1625
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
1✔
1626
      void   *pBuf = rpcMallocCont(bufLen);
1✔
1627

1628
      if (pBuf != NULL) {
1!
1629
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
1!
1630
          code = bufLen;
×
1631
          sdbRelease(pSdb, pDnode);
×
1632
          goto _exit;
×
1633
        }
1634
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
1✔
1635
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
1!
1636
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
1✔
1637
        }
1638
      }
1639
    }
1640

1641
    sdbRelease(pSdb, pDnode);
1✔
1642
  }
1643

1644
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1!
1645
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1646
  }
1647

1648
_exit:
1✔
1649
  if (code != 0) {
1!
1650
    if (terrno == 0) terrno = code;
×
1651
  }
1652
  return code;
1✔
1653
}
1654

1655
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1✔
1656
  int32_t code = 0;
1✔
1657

1658
#ifdef TD_ENTERPRISE
1659
  SMnode       *pMnode = pReq->info.node;
1✔
1660
  SMCfgDnodeReq cfgReq = {0};
1✔
1661
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1!
1662

1663
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1!
1664
    tFreeSMCfgDnodeReq(&cfgReq);
×
1665
    TAOS_RETURN(code);
×
1666
  }
1667
  const STraceId *trace = &pReq->info.traceId;
1✔
1668
  SDCfgDnodeReq   dcfgReq = {0};
1✔
1669
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1!
1670
    strcpy(dcfgReq.config, cfgReq.config);
1✔
1671
    strcpy(dcfgReq.value, cfgReq.value);
1✔
1672
    tFreeSMCfgDnodeReq(&cfgReq);
1✔
1673
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1✔
1674
  } else {
1675
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1676
    tFreeSMCfgDnodeReq(&cfgReq);
×
1677
    TAOS_RETURN(code);
×
1678
  }
1679

1680
#else
1681
  TAOS_RETURN(code);
1682
#endif
1683
}
1684

1685
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
1✔
1686
  SMnode *pMnode = pRsp->info.node;
1✔
1687
  int16_t nSuccess = 0;
1✔
1688
  int16_t nFailed = 0;
1✔
1689

1690
  if (0 == pRsp->code) {
1!
1691
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
1✔
1692
  } else {
1693
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1694
  }
1695

1696
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
1✔
1697
  bool    finished = nSuccess + nFailed >= nReq;
1✔
1698

1699
  if (finished) {
1!
1700
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1✔
1701
  }
1702

1703
  const STraceId *trace = &pRsp->info.traceId;
1✔
1704
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
1!
1705
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1706

1707
  return 0;
1✔
1708
}
1709

1710
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7✔
1711
  SMnode *pMnode = pReq->info.node;
7✔
1712
  int32_t totalRows = 0;
7✔
1713
  int32_t numOfRows = 0;
7✔
1714
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
7✔
1715
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
7✔
1716
  char   *pWrite = NULL;
7✔
1717
  int32_t cols = 0;
7✔
1718
  int32_t code = 0;
7✔
1719
  int32_t lino = 0;
7✔
1720

1721
  cfgOpts[totalRows] = "statusInterval";
7✔
1722
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
7✔
1723
  totalRows++;
7✔
1724

1725
  cfgOpts[totalRows] = "timezone";
7✔
1726
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
7✔
1727
  totalRows++;
7✔
1728

1729
  cfgOpts[totalRows] = "locale";
7✔
1730
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
7✔
1731
  totalRows++;
7✔
1732

1733
  cfgOpts[totalRows] = "charset";
7✔
1734
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
7✔
1735
  totalRows++;
7✔
1736

1737
  cfgOpts[totalRows] = "monitor";
7✔
1738
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
7✔
1739
  totalRows++;
7✔
1740

1741
  cfgOpts[totalRows] = "monitorInterval";
7✔
1742
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
7✔
1743
  totalRows++;
7✔
1744

1745
  cfgOpts[totalRows] = "slowLogThreshold";
7✔
1746
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
7✔
1747
  totalRows++;
7✔
1748

1749
  cfgOpts[totalRows] = "slowLogMaxLen";
7✔
1750
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
7✔
1751
  totalRows++;
7✔
1752

1753
  char scopeStr[64] = {0};
7✔
1754
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
7✔
1755
  cfgOpts[totalRows] = "slowLogScope";
7✔
1756
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
7✔
1757
  totalRows++;
7✔
1758

1759
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1760
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1761

1762
  for (int32_t i = 0; i < totalRows; i++) {
70✔
1763
    cols = 0;
63✔
1764

1765
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
63✔
1766
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1767
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
63!
1768

1769
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
63✔
1770
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1771
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
63!
1772

1773
    numOfRows++;
63✔
1774
  }
1775

1776
_OVER:
7✔
1777
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
7!
1778
  pShow->numOfRows += numOfRows;
7✔
1779
  return numOfRows;
7✔
1780
}
1781

1782
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1783

1784
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
799✔
1785
  SMnode    *pMnode = pReq->info.node;
799✔
1786
  SSdb      *pSdb = pMnode->pSdb;
799✔
1787
  int32_t    numOfRows = 0;
799✔
1788
  int32_t    cols = 0;
799✔
1789
  ESdbStatus objStatus = 0;
799✔
1790
  SDnodeObj *pDnode = NULL;
799✔
1791
  int64_t    curMs = taosGetTimestampMs();
799✔
1792
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1793
  int32_t    code = 0;
799✔
1794
  int32_t    lino = 0;
799✔
1795

1796
  while (numOfRows < rows) {
3,776!
1797
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
3,776✔
1798
    if (pShow->pIter == NULL) break;
3,776✔
1799
    bool online = mndIsDnodeOnline(pDnode, curMs);
2,977✔
1800

1801
    cols = 0;
2,977✔
1802

1803
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,977✔
1804
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
2,977!
1805

1806
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
2,977✔
1807

1808
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,977✔
1809
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,977!
1810

1811
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,977✔
1812
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
2,977✔
1813
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
2,977!
1814

1815
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,977✔
1816
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
2,977!
1817
                        &lino, _OVER);
1818

1819
    const char *status = "ready";
2,977✔
1820
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
2,977!
1821
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
2,977!
1822
    if (!online) {
2,977✔
1823
      if (objStatus == SDB_STATUS_CREATING)
422!
1824
        status = "creating*";
×
1825
      else if (objStatus == SDB_STATUS_DROPPING)
422!
1826
        status = "dropping*";
×
1827
      else
1828
        status = "offline";
422✔
1829
    }
1830

1831
    STR_TO_VARSTR(buf, status);
2,977✔
1832
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,977✔
1833
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,977!
1834

1835
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,977✔
1836
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
2,977!
1837
                        _OVER);
1838

1839
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,977✔
1840
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
2,977!
1841
                        _OVER);
1842

1843
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
2,977✔
1844
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
2,977✔
1845

1846
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,977✔
1847
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
2,977!
1848
    taosMemoryFreeClear(b);
2,977!
1849

1850
#ifdef TD_ENTERPRISE
1851
    STR_TO_VARSTR(buf, pDnode->machineId);
2,977✔
1852
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,977✔
1853
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,977!
1854
#endif
1855

1856
    numOfRows++;
2,977✔
1857
    sdbRelease(pSdb, pDnode);
2,977✔
1858
  }
1859

1860
_OVER:
×
1861
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
799!
1862

1863
  pShow->numOfRows += numOfRows;
799✔
1864
  return numOfRows;
799✔
1865
}
1866

1867
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1868
  SSdb *pSdb = pMnode->pSdb;
×
1869
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1870
}
×
1871

1872
// get int32_t value from 'SMCfgDnodeReq'
1873
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32_t *pOutValue) {
×
1874
  int32_t code = 0;
×
1875
  if (' ' != pMCfgReq->config[optLen] && 0 != pMCfgReq->config[optLen]) {
×
1876
    goto _err;
×
1877
  }
1878

1879
  if (' ' == pMCfgReq->config[optLen]) {
×
1880
    // 'key value'
1881
    if (strlen(pMCfgReq->value) != 0) goto _err;
×
1882
    *pOutValue = atoi(pMCfgReq->config + optLen + 1);
×
1883
  } else {
1884
    // 'key' 'value'
1885
    if (strlen(pMCfgReq->value) == 0) goto _err;
×
1886
    *pOutValue = atoi(pMCfgReq->value);
×
1887
  }
1888

1889
  TAOS_RETURN(code);
×
1890

1891
_err:
×
1892
  mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
×
1893
  code = TSDB_CODE_INVALID_CFG;
×
1894
  TAOS_RETURN(code);
×
1895
}
1896

1897
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
3,295✔
1898
  SDnodeObj *pObj = NULL;
3,295✔
1899
  void      *pIter = NULL;
3,295✔
1900
  SSdb      *pSdb = pMnode->pSdb;
3,295✔
1901
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
3,295✔
1902
  while (1) {
2,938✔
1903
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
6,233✔
1904
    if (pIter == NULL) break;
6,233✔
1905

1906
    char *fqdn = taosStrdup(pObj->fqdn);
2,938✔
1907
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
2,938!
1908
      mError("failed to fqdn into array, but continue at this time");
×
1909
    }
1910
    sdbRelease(pSdb, pObj);
2,938✔
1911
  }
1912
  return fqdns;
3,295✔
1913
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc