• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4926

13 Jan 2026 05:43AM UTC coverage: 66.053% (-0.05%) from 66.107%
#4926

push

travis-ci

web-flow
feat: [6654385780] show snap progress (#34203)

48 of 59 new or added lines in 7 files covered. (81.36%)

582 existing lines in 124 files now uncovered.

200362 of 303334 relevant lines covered (66.05%)

132283104.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.57
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndToken.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "taos_monitor.h"
34
#include "tconfig.h"
35
#include "tjson.h"
36
#include "tmisce.h"
37
#include "tunit.h"
38
#if defined(TD_ENTERPRISE)
39
#include "taoskInt.h"
40
#endif
41

42
#define TSDB_DNODE_VER_NUMBER   2
43
#define TSDB_DNODE_RESERVE_SIZE 40
44

45
static const char *offlineReason[] = {
46
    "",
47
    "status msg timeout",
48
    "status not received",
49
    "version not match",
50
    "dnodeId not match",
51
    "clusterId not match",
52
    "statusInterval not match",
53
    "timezone not match",
54
    "locale not match",
55
    "charset not match",
56
    "ttlChangeOnWrite not match",
57
    "enableWhiteList not match",
58
    "encryptionKey not match",
59
    "monitor not match",
60
    "monitor switch not match",
61
    "monitor interval not match",
62
    "monitor slow log threshold not match",
63
    "monitor slow log sql max len not match",
64
    "monitor slow log scope not match",
65
    "unknown",
66
};
67

68
enum {
69
  DND_ACTIVE_CODE,
70
  DND_CONN_ACTIVE_CODE,
71
};
72

73
enum {
74
  DND_CREATE,
75
  DND_ADD,
76
  DND_DROP,
77
};
78

79
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
80
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
81
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
82
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
83
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
84
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
85
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
86

87
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
89
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
90
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
91
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
92
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
93
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
94
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
95
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
96
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
97
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
98

99
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
100
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
101
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
102
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
103

104
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq);
105
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq);
106
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
107
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
108
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq);
109

110
#ifdef _GRANT
111
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
112
#else
113
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
114
#endif
115

116
int32_t mndInitDnode(SMnode *pMnode) {
399,268✔
117
  SSdbTable table = {
399,268✔
118
      .sdbType = SDB_DNODE,
119
      .keyType = SDB_KEY_INT32,
120
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
121
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
122
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
123
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
124
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
125
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
126
  };
127

128
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
399,268✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
399,268✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
399,268✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
399,268✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
399,268✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
399,268✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
399,268✔
135
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
399,268✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
399,268✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
399,268✔
138
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
399,268✔
139
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
399,268✔
140
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
399,268✔
141
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
399,268✔
142

143
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
399,268✔
144
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
399,268✔
145
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
399,268✔
146
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
399,268✔
147

148
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC, mndProcessKeySyncReq);
399,268✔
149
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC_RSP, mndProcessKeySyncRsp);
399,268✔
150
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_ENCRYPT_KEY, mndProcessAlterEncryptKeyReq);
399,268✔
151

152
  return sdbSetTable(pMnode->pSdb, table);
399,268✔
153
}
154

155
void mndCleanupDnode(SMnode *pMnode) {}
399,208✔
156

157
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
285,689✔
158
  int32_t  code = -1;
285,689✔
159
  SSdbRaw *pRaw = NULL;
285,689✔
160
  STrans  *pTrans = NULL;
285,689✔
161

162
  SDnodeObj dnodeObj = {0};
285,689✔
163
  dnodeObj.id = 1;
285,689✔
164
  dnodeObj.createdTime = taosGetTimestampMs();
285,689✔
165
  dnodeObj.updateTime = dnodeObj.createdTime;
285,689✔
166
  dnodeObj.port = tsServerPort;
285,689✔
167
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
285,689✔
168
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
285,689✔
169
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
285,689✔
170
  char *machineId = NULL;
285,689✔
171
  code = tGetMachineId(&machineId);
285,689✔
172
  if (machineId) {
285,689✔
173
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
285,689✔
174
    taosMemoryFreeClear(machineId);
285,689✔
175
  } else {
176
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
177
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
178
    goto _OVER;
×
179
#endif
180
  }
181

182
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
285,689✔
183
  if (pTrans == NULL) {
285,689✔
184
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
185
    if (terrno != 0) code = terrno;
×
186
    goto _OVER;
×
187
  }
188
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
285,689✔
189

190
  pRaw = mndDnodeActionEncode(&dnodeObj);
285,689✔
191
  if (pRaw == NULL) {
285,689✔
192
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
193
    if (terrno != 0) code = terrno;
×
194
    goto _OVER;
×
195
  }
196
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
285,689✔
197
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
285,689✔
198
  pRaw = NULL;
285,689✔
199

200
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
285,689✔
201
  code = 0;
285,689✔
202

203
_OVER:
285,689✔
204
  mndTransDrop(pTrans);
285,689✔
205
  sdbFreeRaw(pRaw);
285,689✔
206
  return code;
285,689✔
207
}
208

209
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,223,948✔
210
  int32_t code = 0;
2,223,948✔
211
  int32_t lino = 0;
2,223,948✔
212
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,223,948✔
213

214
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,223,948✔
215
  if (pRaw == NULL) goto _OVER;
2,223,948✔
216

217
  int32_t dataPos = 0;
2,223,948✔
218
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,223,948✔
219
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,223,948✔
220
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,223,948✔
221
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,223,948✔
222
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,223,948✔
223
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,223,948✔
224
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,223,948✔
225
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,223,948✔
226
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,223,948✔
227
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,223,948✔
228

229
  terrno = 0;
2,223,948✔
230

231
_OVER:
2,223,948✔
232
  if (terrno != 0) {
2,223,948✔
233
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
234
    sdbFreeRaw(pRaw);
×
235
    return NULL;
×
236
  }
237

238
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,223,948✔
239
  return pRaw;
2,223,948✔
240
}
241

242
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
1,495,150✔
243
  int32_t code = 0;
1,495,150✔
244
  int32_t lino = 0;
1,495,150✔
245
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,495,150✔
246
  SSdbRow   *pRow = NULL;
1,495,150✔
247
  SDnodeObj *pDnode = NULL;
1,495,150✔
248

249
  int8_t sver = 0;
1,495,150✔
250
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,495,150✔
251
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
1,495,150✔
252
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
253
    goto _OVER;
×
254
  }
255

256
  pRow = sdbAllocRow(sizeof(SDnodeObj));
1,495,150✔
257
  if (pRow == NULL) goto _OVER;
1,495,150✔
258

259
  pDnode = sdbGetRowObj(pRow);
1,495,150✔
260
  if (pDnode == NULL) goto _OVER;
1,495,150✔
261

262
  int32_t dataPos = 0;
1,495,150✔
263
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
1,495,150✔
264
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
1,495,150✔
265
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
1,495,150✔
266
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
1,495,150✔
267
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
1,495,150✔
268
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
1,495,150✔
269
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
1,495,150✔
270
  if (sver > 1) {
1,495,150✔
271
    int16_t keyLen = 0;
1,495,150✔
272
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,495,150✔
273
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,495,150✔
274
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,495,150✔
275
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,495,150✔
276
  }
277

278
  terrno = 0;
1,495,150✔
279
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
1,495,150✔
280
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
281
  }
282

283
_OVER:
1,495,150✔
284
  if (terrno != 0) {
1,495,150✔
285
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
286
    taosMemoryFreeClear(pRow);
×
287
    return NULL;
×
288
  }
289

290
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
1,495,150✔
291
  return pRow;
1,495,150✔
292
}
293

294
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
714,888✔
295
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
714,888✔
296
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
714,888✔
297

298
  char ep[TSDB_EP_LEN] = {0};
714,888✔
299
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
714,888✔
300
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
714,888✔
301
  return 0;
714,888✔
302
}
303

304
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
1,495,105✔
305
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
1,495,105✔
306
  return 0;
1,495,105✔
307
}
308

309
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
771,552✔
310
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
771,552✔
311
  pOld->updateTime = pNew->updateTime;
771,552✔
312
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
313
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
771,552✔
314
#endif
315
  return 0;
771,552✔
316
}
317

318
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
153,980,238✔
319
  SSdb      *pSdb = pMnode->pSdb;
153,980,238✔
320
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
153,980,238✔
321
  if (pDnode == NULL) {
153,980,502✔
322
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
243,893✔
323
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
53,906✔
324
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
189,987✔
325
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
326
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
189,987✔
327
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
189,987✔
328
    } else {
329
      terrno = TSDB_CODE_APP_ERROR;
×
330
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
331
    }
332
  }
333

334
  return pDnode;
153,980,502✔
335
}
336

337
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
155,184,811✔
338
  SSdb *pSdb = pMnode->pSdb;
155,184,811✔
339
  sdbRelease(pSdb, pDnode);
155,184,811✔
340
}
155,184,811✔
341

342
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
8,821,684✔
343
  SEpSet epSet = {0};
8,821,684✔
344
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
8,821,684✔
345
  return epSet;
8,821,684✔
346
}
347

348
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
737,545✔
349
  SEpSet     epSet = {0};
737,545✔
350
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
737,545✔
351
  if (!pDnode) return epSet;
737,545✔
352

353
  epSet = mndGetDnodeEpset(pDnode);
737,545✔
354

355
  mndReleaseDnode(pMnode, pDnode);
737,545✔
356
  return epSet;
737,545✔
357
}
358

359
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,287,669✔
360
  SSdb *pSdb = pMnode->pSdb;
1,287,669✔
361

362
  void *pIter = NULL;
1,287,669✔
363
  while (1) {
2,218,053✔
364
    SDnodeObj *pDnode = NULL;
3,505,722✔
365
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
3,505,722✔
366
    if (pIter == NULL) break;
3,505,722✔
367

368
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
2,798,249✔
369
      sdbCancelFetch(pSdb, pIter);
580,196✔
370
      return pDnode;
580,196✔
371
    }
372

373
    sdbRelease(pSdb, pDnode);
2,218,053✔
374
  }
375

376
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
707,473✔
377
  return NULL;
707,473✔
378
}
379

380
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
144,821✔
381
  SSdb *pSdb = pMnode->pSdb;
144,821✔
382

383
  void *pIter = NULL;
144,821✔
384
  while (1) {
152,085✔
385
    SDnodeObj *pDnode = NULL;
296,906✔
386
    ESdbStatus objStatus = 0;
296,906✔
387
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
296,906✔
388
    if (pIter == NULL) break;
296,906✔
389

390
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
296,906✔
391
      sdbCancelFetch(pSdb, pIter);
144,821✔
392
      return pDnode;
144,821✔
393
    }
394

395
    sdbRelease(pSdb, pDnode);
152,085✔
396
  }
397

398
  return NULL;
×
399
}
400

401
int32_t mndGetDnodeSize(SMnode *pMnode) {
108,859,727✔
402
  SSdb *pSdb = pMnode->pSdb;
108,859,727✔
403
  return sdbGetSize(pSdb, SDB_DNODE);
108,860,217✔
404
}
405

406
int32_t mndGetDbSize(SMnode *pMnode) {
×
407
  SSdb *pSdb = pMnode->pSdb;
×
408
  return sdbGetSize(pSdb, SDB_DB);
×
409
}
410

411
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
93,216,549✔
412
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
93,216,549✔
413
  if (interval > (int64_t)tsStatusTimeoutMs) {
93,215,221✔
414
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,079,950✔
415
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
40,136✔
416
    }
417
    return false;
2,079,950✔
418
  }
419
  return true;
91,135,271✔
420
}
421

422
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
35,768,376✔
423
  SSdb *pSdb = pMnode->pSdb;
35,768,376✔
424

425
  int32_t numOfEps = 0;
35,768,376✔
426
  void   *pIter = NULL;
35,768,376✔
427
  while (1) {
86,275,232✔
428
    SDnodeObj *pDnode = NULL;
122,043,608✔
429
    ESdbStatus objStatus = 0;
122,043,608✔
430
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
122,043,608✔
431
    if (pIter == NULL) break;
122,043,608✔
432

433
    SDnodeEp dnodeEp = {0};
86,275,232✔
434
    dnodeEp.id = pDnode->id;
86,275,232✔
435
    dnodeEp.ep.port = pDnode->port;
86,275,232✔
436
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
86,275,232✔
437
    sdbRelease(pSdb, pDnode);
86,275,232✔
438

439
    dnodeEp.isMnode = 0;
86,275,232✔
440
    if (mndIsMnode(pMnode, pDnode->id)) {
86,275,232✔
441
      dnodeEp.isMnode = 1;
38,922,984✔
442
    }
443
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
86,275,232✔
444
      mError("failed to put ep into array, but continue at this call");
×
445
    }
446
  }
447
}
35,768,376✔
448

449
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
22,845,887✔
450
  SSdb   *pSdb = pMnode->pSdb;
22,845,887✔
451
  int32_t code = 0;
22,845,887✔
452

453
  int32_t numOfEps = 0;
22,845,887✔
454
  void   *pIter = NULL;
22,845,887✔
455
  while (1) {
96,157,178✔
456
    SDnodeObj *pDnode = NULL;
119,003,065✔
457
    ESdbStatus objStatus = 0;
119,003,065✔
458
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
119,003,065✔
459
    if (pIter == NULL) break;
119,003,065✔
460

461
    SDnodeInfo dInfo;
96,156,908✔
462
    dInfo.id = pDnode->id;
96,157,178✔
463
    dInfo.ep.port = pDnode->port;
96,157,178✔
464
    dInfo.offlineReason = pDnode->offlineReason;
96,157,178✔
465
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
96,157,178✔
466
    sdbRelease(pSdb, pDnode);
96,157,178✔
467
    if (mndIsMnode(pMnode, pDnode->id)) {
96,157,178✔
468
      dInfo.isMnode = 1;
26,806,102✔
469
    } else {
470
      dInfo.isMnode = 0;
69,351,076✔
471
    }
472

473
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
96,157,178✔
474
      code = terrno;
×
475
      sdbCancelFetch(pSdb, pIter);
×
476
      break;
×
477
    }
478
  }
479
  TAOS_RETURN(code);
22,845,887✔
480
}
481

482
#define CHECK_MONITOR_PARA(para, err)                                                                    \
483
  if (pCfg->monitorParas.para != para) {                                                                 \
484
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
485
    terrno = err;                                                                                        \
486
    return err;                                                                                          \
487
  }
488

489
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
×
490
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
×
491
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
×
492
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
×
493
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
×
494
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
×
495

496
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
×
497
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
498
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
499
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
500
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
501
  }
502

503
  /*
504
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
505
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
506
           tsStatusIntervalMs);
507
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
508
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
509
  }
510
  */
511

512
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
×
513
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
514
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
515
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
516
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
517
  }
518

519
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
×
520
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
521
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
522
    return DND_REASON_LOCALE_NOT_MATCH;
×
523
  }
524

525
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
×
526
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
527
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
528
    return DND_REASON_CHARSET_NOT_MATCH;
×
529
  }
530

531
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
×
532
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
533
           tsTtlChangeOnWrite);
534
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
535
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
536
  }
537
  int8_t enable = tsEnableWhiteList ? 1 : 0;
×
538
  if (pCfg->enableWhiteList != enable) {
×
539
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
540
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
541
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
542
  }
543

544
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
×
545
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
×
546
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
547
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
548
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
549
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
550
  }
551

552
  return DND_REASON_ONLINE;
×
553
}
554

555
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
61,527✔
556
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
61,527✔
557
    return 0.0;
83✔
558
  }
559

560
  int64_t deltaCount = currentCount - lastCount;
61,444✔
561
  int64_t deltaMs = currentTimeMs - lastTimeMs;
61,444✔
562
  double  rate = (double)deltaCount / (double)deltaMs;
61,444✔
563
  return rate;
61,444✔
564
}
565

566
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
118,628,945✔
567
  bool stateChanged = false;
118,628,945✔
568
  bool roleChanged = pGid->syncState != pVload->syncState ||
118,637,777✔
569
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
232,801,446✔
570
                     pGid->roleTimeMs != pVload->roleTimeMs;
114,172,501✔
571

572
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
118,628,945✔
573
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
140,216✔
574
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
72,806✔
575
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
67,410✔
576
      int64_t currentTimeMs = taosGetTimestampMs();
61,527✔
577
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
61,527✔
578
                                          pGid->lastSyncAppliedIndexUpdateTime);
579

580
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
61,527✔
581
    }
582
  }
583

584
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
118,628,945✔
585
  pGid->syncCommitIndex = pVload->syncCommitIndex;
118,628,945✔
586
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
118,628,945✔
587
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
118,628,945✔
588
  pGid->learnerProgress = pVload->learnerProgress;
118,628,945✔
589
  pGid->snapSeq = pVload->snapSeq;
118,628,945✔
590
  pGid->syncTotalIndex = pVload->syncTotalIndex;
118,628,945✔
591
  if (pVload->snapSeq >= 0 && pVload->snapSeq < SYNC_SNAPSHOT_SEQ_END || pVload->syncState == TAOS_SYNC_STATE_LEARNER) {
118,628,945✔
592
    mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64
34,321,307✔
593
          " , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d",
594
          vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex,
595
          pVload->syncTotalIndex, pVload->learnerProgress, pVload->snapSeq);
596
  }
597

598
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
118,628,945✔
599
      pGid->startTimeMs != pVload->startTimeMs) {
113,666,222✔
600
    mInfo(
4,962,723✔
601
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
602
        "canRead:%d, dnode:%d",
603
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
604
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
605
    pGid->syncState = pVload->syncState;
4,962,723✔
606
    pGid->syncTerm = pVload->syncTerm;
4,962,723✔
607
    pGid->syncRestore = pVload->syncRestore;
4,962,723✔
608
    pGid->syncCanRead = pVload->syncCanRead;
4,962,723✔
609
    pGid->startTimeMs = pVload->startTimeMs;
4,962,723✔
610
    pGid->roleTimeMs = pVload->roleTimeMs;
4,962,723✔
611
    stateChanged = true;
4,962,723✔
612
  }
613
  return stateChanged;
118,628,945✔
614
}
615

616
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
26,259,169✔
617
  bool stateChanged = false;
26,259,169✔
618
  bool roleChanged = pObj->syncState != pMload->syncState ||
26,264,494✔
619
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
52,110,082✔
620
                     pObj->roleTimeMs != pMload->roleTimeMs;
25,850,913✔
621
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
26,259,169✔
622
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
415,891✔
623
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
624
          pObj->syncTerm, pMload->syncTerm);
625
    pObj->syncState = pMload->syncState;
415,891✔
626
    pObj->syncTerm = pMload->syncTerm;
415,891✔
627
    pObj->syncRestore = pMload->syncRestore;
415,891✔
628
    pObj->roleTimeMs = pMload->roleTimeMs;
415,891✔
629
    stateChanged = true;
415,891✔
630
  }
631
  return stateChanged;
26,259,169✔
632
}
633

634
extern char   *tsMonFwUri;
635
extern char   *tsMonSlowLogUri;
UNCOV
636
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
×
UNCOV
637
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
638
  SStatisReq statisReq = {0};
×
UNCOV
639
  int32_t    code = -1;
×
640

UNCOV
641
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
×
642

UNCOV
643
  if (tsMonitorLogProtocol) {
×
UNCOV
644
    mInfo("process statis req,\n %s", statisReq.pCont);
×
645
  }
646

UNCOV
647
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
×
UNCOV
648
    monSendContent(statisReq.pCont, tsMonFwUri);
×
649
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
650
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
651
  }
652

UNCOV
653
  tFreeSStatisReq(&statisReq);
×
UNCOV
654
  return 0;
×
655
}
656

657
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
×
658
  mTrace("process audit req:%p", pReq);
×
659
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
660
    SMnode   *pMnode = pReq->info.node;
×
661
    SAuditReq auditReq = {0};
×
662

663
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
664

665
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
×
666

667
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
×
668
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
669

670
    tFreeSAuditReq(&auditReq);
×
671
  }
672
  return 0;
×
673
}
674

675
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
676
  mTrace("process audit req:%p", pReq);
×
677
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
678
    SMnode        *pMnode = pReq->info.node;
×
679
    SBatchAuditReq auditReq = {0};
×
680

681
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
682

683
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
684

685
    for (int32_t i = 0; i < nAudit; ++i) {
×
686
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
687
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
688

689
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
690
                     audit->duration, audit->affectedRows);
691
    }
692

693
    tFreeSBatchAuditReq(&auditReq);
×
694
  }
695
  return 0;
×
696
}
697

698
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
645,802✔
699
  int32_t       code = 0, lino = 0;
645,802✔
700
  SDnodeInfoReq infoReq = {0};
645,802✔
701
  int32_t       contLen = 0;
645,802✔
702
  void         *pReq = NULL;
645,802✔
703

704
  infoReq.dnodeId = pDnode->id;
645,802✔
705
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
645,802✔
706

707
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
645,802✔
708
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
709
  }
710
  pReq = rpcMallocCont(contLen);
645,802✔
711
  if (pReq == NULL) {
645,802✔
712
    TAOS_RETURN(terrno);
×
713
  }
714

715
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
645,802✔
716
    code = contLen;
×
717
    goto _exit;
×
718
  }
719

720
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
645,802✔
721
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
645,802✔
722
_exit:
645,802✔
723
  if (code < 0) {
645,802✔
724
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
725
  }
726
  TAOS_RETURN(code);
645,802✔
727
}
728

729
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
645,802✔
730
  int32_t       code = 0, lino = 0;
645,802✔
731
  SMnode       *pMnode = pReq->info.node;
645,802✔
732
  SDnodeInfoReq infoReq = {0};
645,802✔
733
  SDnodeObj    *pDnode = NULL;
645,802✔
734
  STrans       *pTrans = NULL;
645,802✔
735
  SSdbRaw      *pCommitRaw = NULL;
645,802✔
736

737
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
645,802✔
738

739
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
645,802✔
740
  if (pDnode == NULL) {
645,802✔
741
    TAOS_CHECK_EXIT(terrno);
×
742
  }
743

744
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
645,802✔
745
  if (pTrans == NULL) {
645,802✔
746
    TAOS_CHECK_EXIT(terrno);
×
747
  }
748

749
  pDnode->updateTime = taosGetTimestampMs();
645,802✔
750

751
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
645,802✔
752
    TAOS_CHECK_EXIT(terrno);
×
753
  }
754
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
645,802✔
755
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
756
    TAOS_CHECK_EXIT(code);
×
757
  }
758
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
645,802✔
759
  pCommitRaw = NULL;
645,802✔
760

761
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
645,802✔
762
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
763
    TAOS_CHECK_EXIT(code);
×
764
  }
765

766
_exit:
645,802✔
767
  mndReleaseDnode(pMnode, pDnode);
645,802✔
768
  if (code != 0) {
645,802✔
769
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
770
  }
771
  mndTransDrop(pTrans);
645,802✔
772
  sdbFreeRaw(pCommitRaw);
645,802✔
773
  TAOS_RETURN(code);
645,802✔
774
}
775

776
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
41,690,978✔
777
  SMnode    *pMnode = pReq->info.node;
41,690,978✔
778
  SStatusReq statusReq = {0};
41,690,978✔
779
  SDnodeObj *pDnode = NULL;
41,690,978✔
780
  int32_t    code = -1;
41,690,978✔
781

782
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
41,690,978✔
783

784
  int64_t clusterid = mndGetClusterId(pMnode);
41,690,308✔
785
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
41,690,308✔
786
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
787
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
788
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
789
    goto _OVER;
×
790
  }
791

792
  if (statusReq.dnodeId == 0) {
41,690,308✔
793
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
950,589✔
794
    if (pDnode == NULL) {
950,589✔
795
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
370,713✔
796
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
370,713✔
797
      if (terrno != 0) code = terrno;
370,713✔
798
      goto _OVER;
370,713✔
799
    }
800
  } else {
801
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
40,739,719✔
802
    if (pDnode == NULL) {
40,739,719✔
803
      int32_t err = terrno;
190,149✔
804
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
190,149✔
805
      if (pDnode != NULL) {
190,149✔
806
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
320✔
807
        terrno = err;
320✔
808
        goto _OVER;
320✔
809
      }
810

811
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
189,829✔
812
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
189,829✔
813
        terrno = err;
45,008✔
814
        goto _OVER;
45,008✔
815
      } else {
816
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
144,821✔
817
        if (pDnode == NULL) goto _OVER;
144,821✔
818
      }
819
    }
820
  }
821

822
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
41,274,267✔
823
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
41,274,267✔
824

825
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
41,274,267✔
826
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
41,274,267✔
827
  int64_t curMs = taosGetTimestampMs();
41,274,267✔
828
  bool    online = mndIsDnodeOnline(pDnode, curMs);
41,274,267✔
829
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
41,274,267✔
830
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
41,274,267✔
831
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
41,274,267✔
832
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
41,274,267✔
833
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
41,274,267✔
834
  bool    analVerChanged = (analVer != statusReq.analVer);
41,274,267✔
835
  bool    auditDBChanged = false;
41,274,267✔
836
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
41,274,267✔
837
  bool    auditTokenChanged = false;
41,274,267✔
838
  char    auditToken[TSDB_TOKEN_LEN] = {0};
41,274,267✔
839

840
  if (tsAuditUseToken) {
41,274,267✔
841
    SDbObj *pDb = mndAcquireAuditDb(pMnode);
41,274,267✔
842
    if (pDb != NULL) {
41,274,267✔
843
      SName name = {0};
67✔
844
      if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0)
67✔
845
        mError("db:%s, failed to parse db name", pDb->name);
×
846
      tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN);
67✔
847
      mndReleaseDb(pMnode, pDb);
67✔
848
    }
849
    if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
41,274,267✔
850

851
    char    auditUser[TSDB_USER_LEN] = {0};
41,274,267✔
852
    int32_t ret = 0;
41,274,267✔
853
    if ((ret = mndGetAuditUser(pMnode, auditUser)) != 0) {
41,274,267✔
854
      mTrace("dnode:%d, failed to get audit user since %s", pDnode->id, tstrerror(ret));
×
855
    } else {
856
      mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser);
41,274,267✔
857
      int32_t ret = 0;
41,274,267✔
858
      if ((ret = mndGetUserActiveToken("audit", auditToken)) != 0) {
41,274,267✔
859
        mTrace("dnode:%d, failed to get audit user active token, token:%s, since %s", pDnode->id, auditToken,
41,274,200✔
860
               tstrerror(ret));
861
      } else {
862
        mTrace("dnode:%d, get audit user active token:%s", pDnode->id, auditToken);
67✔
863
        if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditTokenChanged = true;
67✔
864
      }
865
    }
866
  } 
867

868
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
40,670,899✔
869
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
39,345,204✔
870
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditTokenChanged;
81,945,166✔
871
  const STraceId *trace = &pReq->info.traceId;
41,274,267✔
872
  char            timestamp[TD_TIME_STR_LEN] = {0};
41,274,267✔
873
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
41,274,267✔
874
  mGTrace(
41,274,267✔
875
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
876
      "timestamp:%s",
877
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
878

879
  if (reboot) {
41,274,267✔
880
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
623,739✔
881
  }
882

883
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
41,274,267✔
884
  if (labs(delta) >= tsTimestampDeltaLimit) {
41,274,267✔
885
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
886
    code = terrno;
×
887

888
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
889
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
890
           tstrerror(code), tsTimestampDeltaLimit);
891
    goto _OVER;
×
892
  }
893
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
160,390,757✔
894
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
119,116,490✔
895

896
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
119,116,490✔
897
    if (pVgroup != NULL) {
119,116,490✔
898
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
118,680,583✔
899
        pVgroup->cacheUsage = pVload->cacheUsage;
92,989,855✔
900
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
92,989,855✔
901
        pVgroup->numOfTables = pVload->numOfTables;
92,989,855✔
902
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
92,989,855✔
903
        pVgroup->totalStorage = pVload->totalStorage;
92,989,855✔
904
        pVgroup->compStorage = pVload->compStorage;
92,989,855✔
905
        pVgroup->pointsWritten = pVload->pointsWritten;
92,989,855✔
906
      }
907
      bool stateChanged = false;
118,680,583✔
908
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
154,507,294✔
909
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
154,455,656✔
910
        if (pGid->dnodeId == statusReq.dnodeId) {
154,455,656✔
911
          if (pVload->startTimeMs == 0) {
118,628,945✔
912
            pVload->startTimeMs = statusReq.rebootTime;
×
913
          }
914
          if (pVload->roleTimeMs == 0) {
118,628,945✔
915
            pVload->roleTimeMs = statusReq.rebootTime;
×
916
          }
917
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
118,628,945✔
918
          break;
118,628,945✔
919
        }
920
      }
921
      if (stateChanged) {
118,680,583✔
922
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,962,723✔
923
        if (pDb != NULL && pDb->stateTs != curMs) {
4,962,723✔
924
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,482,006✔
925
                pDb->stateTs, curMs);
926
          pDb->stateTs = curMs;
3,482,006✔
927
        }
928
        mndReleaseDb(pMnode, pDb);
4,962,723✔
929
      }
930
    }
931

932
    mndReleaseVgroup(pMnode, pVgroup);
119,116,490✔
933
  }
934

935
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
41,274,267✔
936
  if (pObj != NULL) {
41,274,267✔
937
    if (statusReq.mload.roleTimeMs == 0) {
26,259,169✔
938
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
228,128✔
939
    }
940
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
26,259,169✔
941
    mndReleaseMnode(pMnode, pObj);
26,259,169✔
942
  }
943

944
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
41,274,267✔
945
  if (pQnode != NULL) {
41,274,267✔
946
    pQnode->load = statusReq.qload;
83,260✔
947
    mndReleaseQnode(pMnode, pQnode);
83,260✔
948
  }
949

950
  if (needCheck) {
41,274,267✔
951
    if (statusReq.sver != tsVersion) {
35,768,376✔
952
      if (pDnode != NULL) {
×
953
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
954
      }
955
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
956
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
957
      goto _OVER;
×
958
    }
959

960
    if (statusReq.dnodeId == 0) {
35,768,376✔
961
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
579,876✔
962
    } else {
963
      if (statusReq.clusterId != pMnode->clusterId) {
35,188,500✔
964
        if (pDnode != NULL) {
×
965
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
966
        }
967
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
968
               pMnode->clusterId);
969
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
970
        goto _OVER;
×
971
      }
972
    }
973

974
    // Verify whether the cluster parameters are consistent when status change from offline to ready
975
    // pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
976
    // if (pDnode->offlineReason != 0) {
977
    //   mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
978
    //   if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
979
    //   goto _OVER;
980
    // }
981

982
    if (!online) {
35,768,376✔
983
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
603,368✔
984
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
985
    } else {
986
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
35,165,008✔
987
            statusReq.dnodeVer, dnodeVer, reboot);
988
    }
989

990
    pDnode->rebootTime = statusReq.rebootTime;
35,768,376✔
991
    pDnode->numOfCores = statusReq.numOfCores;
35,768,376✔
992
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
35,768,376✔
993
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
35,768,376✔
994
    pDnode->memAvail = statusReq.memAvail;
35,768,376✔
995
    pDnode->memTotal = statusReq.memTotal;
35,768,376✔
996
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
35,768,376✔
997
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
35,768,376✔
998
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
35,768,376✔
999
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
645,802✔
1000
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
645,802✔
1001
        goto _OVER;
×
1002
      }
1003
    }
1004

1005
    SStatusRsp statusRsp = {0};
35,768,376✔
1006
    statusRsp.statusSeq++;
35,768,376✔
1007
    statusRsp.analVer = analVer;
35,768,376✔
1008
    statusRsp.dnodeVer = dnodeVer;
35,768,376✔
1009
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
35,768,376✔
1010
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
35,768,376✔
1011
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
35,768,376✔
1012
    if (statusRsp.pDnodeEps == NULL) {
35,768,376✔
1013
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1014
      goto _OVER;
×
1015
    }
1016

1017
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
35,768,376✔
1018
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
35,768,376✔
1019
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
35,768,376✔
1020

1021
    if (auditDB[0] != '\0') {
35,768,376✔
1022
      mInfo("dnode:%d, set audit db %s in process status rsp", statusReq.dnodeId, auditDB);
67✔
1023
      tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
67✔
1024
    }
1025
    if (auditToken[0] != '\0') {
35,768,376✔
1026
      mInfo("dnode:%d, set audit token %s in process status rsp", statusReq.dnodeId, auditToken);
67✔
1027
      tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN);
67✔
1028
    }
1029

1030
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
35,768,376✔
1031
    void   *pHead = rpcMallocCont(contLen);
35,768,376✔
1032
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
35,768,376✔
1033
    taosArrayDestroy(statusRsp.pDnodeEps);
35,768,376✔
1034
    if (contLen < 0) {
35,768,376✔
1035
      code = contLen;
×
1036
      goto _OVER;
×
1037
    }
1038

1039
    pReq->info.rspLen = contLen;
35,768,376✔
1040
    pReq->info.rsp = pHead;
35,768,376✔
1041
  }
1042

1043
  pDnode->accessTimes++;
41,274,267✔
1044
  pDnode->lastAccessTime = curMs;
41,274,267✔
1045
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
41,274,267✔
1046
    pDnode->offlineReason = DND_REASON_ONLINE;
603,368✔
1047
  }
1048
  code = 0;
41,274,267✔
1049

1050
_OVER:
41,690,978✔
1051
  mndReleaseDnode(pMnode, pDnode);
41,690,978✔
1052
  taosArrayDestroy(statusReq.pVloads);
41,690,978✔
1053
  if (code != 0) {
41,690,978✔
1054
    mError("dnode:%d, failed to process status req since %s", statusReq.dnodeId, tstrerror(code));
371,383✔
1055
    return code;
371,383✔
1056
  }
1057

1058
  return mndUpdClusterInfo(pReq);
41,319,595✔
1059
}
1060

1061
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
1062
  SMnode    *pMnode = pReq->info.node;
×
1063
  SNotifyReq notifyReq = {0};
×
1064
  int32_t    code = 0;
×
1065

1066
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
1067
    terrno = code;
×
1068
    goto _OVER;
×
1069
  }
1070

1071
  int64_t clusterid = mndGetClusterId(pMnode);
×
1072
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
1073
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
1074
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1075
          notifyReq.clusterId, clusterid, tstrerror(code));
1076
    goto _OVER;
×
1077
  }
1078

1079
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
1080
  for (int32_t v = 0; v < nVgroup; ++v) {
×
1081
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1082

1083
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
1084
    if (pVgroup != NULL) {
×
1085
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
1086
      mndReleaseVgroup(pMnode, pVgroup);
×
1087
    }
1088
  }
1089
  code = mndUpdClusterInfo(pReq);
×
1090
_OVER:
×
1091
  tFreeSNotifyReq(&notifyReq);
×
1092
  return code;
×
1093
}
1094

1095
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
146,931✔
1096
  int32_t  code = -1;
146,931✔
1097
  SSdbRaw *pRaw = NULL;
146,931✔
1098
  STrans  *pTrans = NULL;
146,931✔
1099

1100
  SDnodeObj dnodeObj = {0};
146,931✔
1101
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
146,931✔
1102
  dnodeObj.createdTime = taosGetTimestampMs();
146,931✔
1103
  dnodeObj.updateTime = dnodeObj.createdTime;
146,931✔
1104
  dnodeObj.port = pCreate->port;
146,931✔
1105
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
146,931✔
1106
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
146,931✔
1107

1108
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
146,931✔
1109
  if (pTrans == NULL) {
146,931✔
1110
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1111
    if (terrno != 0) code = terrno;
×
1112
    goto _OVER;
×
1113
  }
1114
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
146,931✔
1115
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
146,931✔
1116

1117
  pRaw = mndDnodeActionEncode(&dnodeObj);
146,931✔
1118
  if (pRaw == NULL) {
146,931✔
1119
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1120
    if (terrno != 0) code = terrno;
×
1121
    goto _OVER;
×
1122
  }
1123
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
146,931✔
1124
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
146,931✔
1125
  pRaw = NULL;
146,931✔
1126

1127
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
146,931✔
1128
  code = 0;
146,931✔
1129

1130
_OVER:
146,931✔
1131
  mndTransDrop(pTrans);
146,931✔
1132
  sdbFreeRaw(pRaw);
146,931✔
1133
  return code;
146,931✔
1134
}
1135

1136
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
28,130✔
1137
  SMnode       *pMnode = pReq->info.node;
28,130✔
1138
  SSdb         *pSdb = pMnode->pSdb;
28,130✔
1139
  SDnodeObj    *pObj = NULL;
28,130✔
1140
  void         *pIter = NULL;
28,130✔
1141
  SDnodeListRsp rsp = {0};
28,130✔
1142
  int32_t       code = -1;
28,130✔
1143

1144
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
28,130✔
1145
  if (NULL == rsp.dnodeList) {
28,130✔
1146
    mError("failed to alloc epSet while process dnode list req");
×
1147
    code = terrno;
×
1148
    goto _OVER;
×
1149
  }
1150

1151
  while (1) {
53,967✔
1152
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
82,097✔
1153
    if (pIter == NULL) break;
82,097✔
1154

1155
    SDNodeAddr dnodeAddr = {0};
53,967✔
1156
    dnodeAddr.nodeId = pObj->id;
53,967✔
1157
    dnodeAddr.epSet.numOfEps = 1;
53,967✔
1158
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
53,967✔
1159
    dnodeAddr.epSet.eps[0].port = pObj->port;
53,967✔
1160

1161
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
107,934✔
1162
      if (terrno != 0) code = terrno;
×
1163
      sdbRelease(pSdb, pObj);
×
1164
      sdbCancelFetch(pSdb, pIter);
×
1165
      goto _OVER;
×
1166
    }
1167

1168
    sdbRelease(pSdb, pObj);
53,967✔
1169
  }
1170

1171
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
28,130✔
1172
  void   *pRsp = rpcMallocCont(rspLen);
28,130✔
1173
  if (pRsp == NULL) {
28,130✔
1174
    code = terrno;
×
1175
    goto _OVER;
×
1176
  }
1177

1178
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
28,130✔
1179
    code = rspLen;
×
1180
    goto _OVER;
×
1181
  }
1182

1183
  pReq->info.rspLen = rspLen;
28,130✔
1184
  pReq->info.rsp = pRsp;
28,130✔
1185
  code = 0;
28,130✔
1186

1187
_OVER:
28,130✔
1188

1189
  if (code != 0) {
28,130✔
1190
    mError("failed to get dnode list since %s", tstrerror(code));
×
1191
  }
1192

1193
  tFreeSDnodeListRsp(&rsp);
28,130✔
1194

1195
  TAOS_RETURN(code);
28,130✔
1196
}
1197

1198
void getSlowLogScopeString(int32_t scope, char *result) {
1,127✔
1199
  if (scope == SLOW_LOG_TYPE_NULL) {
1,127✔
1200
    (void)strncat(result, "NONE", 64);
×
1201
    return;
×
1202
  }
1203
  while (scope > 0) {
2,254✔
1204
    if (scope & SLOW_LOG_TYPE_QUERY) {
1,127✔
1205
      (void)strncat(result, "QUERY", 64);
1,127✔
1206
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,127✔
1207
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1208
      (void)strncat(result, "INSERT", 64);
×
1209
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1210
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1211
      (void)strncat(result, "OTHERS", 64);
×
1212
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1213
    } else {
1214
      (void)printf("invalid slow log scope:%d", scope);
×
1215
      return;
×
1216
    }
1217

1218
    if (scope > 0) {
1,127✔
1219
      (void)strncat(result, "|", 64);
×
1220
    }
1221
  }
1222
}
1223

1224
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
146,931✔
1225
  SMnode         *pMnode = pReq->info.node;
146,931✔
1226
  int32_t         code = -1;
146,931✔
1227
  SDnodeObj      *pDnode = NULL;
146,931✔
1228
  SCreateDnodeReq createReq = {0};
146,931✔
1229
  int32_t         lino = 0;
146,931✔
1230
  int64_t         tss = taosGetTimestampMs();
146,931✔
1231

1232
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
146,931✔
1233
    goto _OVER;
×
1234
  }
1235

1236
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
146,931✔
1237
  TAOS_CHECK_GOTO(code, &lino, _OVER);
146,931✔
1238

1239
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
146,931✔
1240
  code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_DNODE);
146,931✔
1241
  TAOS_CHECK_GOTO(code, &lino, _OVER);
146,931✔
1242

1243
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
146,931✔
1244
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1245
    goto _OVER;
×
1246
  }
1247
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1248
  // if (code != 0) {
1249
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1250
  //          tstrerror(code));
1251
  //   goto _OVER;
1252
  // }
1253

1254
  char ep[TSDB_EP_LEN];
146,931✔
1255
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
146,931✔
1256
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
146,931✔
1257
  if (pDnode != NULL) {
146,931✔
1258
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1259
    goto _OVER;
×
1260
  }
1261

1262
  code = mndCreateDnode(pMnode, pReq, &createReq);
146,931✔
1263
  if (code == 0) {
146,931✔
1264
    code = TSDB_CODE_ACTION_IN_PROGRESS;
146,931✔
1265
    tsGrantHBInterval = 5;
146,931✔
1266
  }
1267

1268
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
146,931✔
1269
    char obj[200] = {0};
146,931✔
1270
    (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
146,931✔
1271

1272
    int64_t tse = taosGetTimestampMs();
146,931✔
1273
    double  duration = (double)(tse - tss);
146,931✔
1274
    duration = duration / 1000;
146,931✔
1275
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
146,931✔
1276
  }
1277

1278
_OVER:
146,931✔
1279
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
146,931✔
1280
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1281
  }
1282

1283
  mndReleaseDnode(pMnode, pDnode);
146,931✔
1284
  tFreeSCreateDnodeReq(&createReq);
146,931✔
1285
  TAOS_RETURN(code);
146,931✔
1286
}
1287

1288
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1289

1290
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
1,771✔
1291

1292
#ifndef TD_ENTERPRISE
1293
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1294
#endif
1295

1296
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
9,148✔
1297
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1298
  int32_t  code = -1;
9,148✔
1299
  SSdbRaw *pRaw = NULL;
9,148✔
1300
  STrans  *pTrans = NULL;
9,148✔
1301
  int32_t  lino = 0;
9,148✔
1302

1303
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
9,148✔
1304
  if (pTrans == NULL) {
9,148✔
1305
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1306
    if (terrno != 0) code = terrno;
×
1307
    goto _OVER;
×
1308
  }
1309
  mndTransSetGroupParallel(pTrans);
9,148✔
1310
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
9,148✔
1311
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
9,148✔
1312

1313
  pRaw = mndDnodeActionEncode(pDnode);
9,148✔
1314
  if (pRaw == NULL) {
9,148✔
1315
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1316
    if (terrno != 0) code = terrno;
×
1317
    goto _OVER;
×
1318
  }
1319
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
9,148✔
1320
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
9,148✔
1321
  pRaw = NULL;
9,148✔
1322

1323
  pRaw = mndDnodeActionEncode(pDnode);
9,148✔
1324
  if (pRaw == NULL) {
9,148✔
1325
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1326
    if (terrno != 0) code = terrno;
×
1327
    goto _OVER;
×
1328
  }
1329
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
9,148✔
1330
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
9,148✔
1331
  pRaw = NULL;
9,148✔
1332

1333
  if (pSObj != NULL) {
9,148✔
1334
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
492✔
1335
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
492✔
1336
  }
1337

1338
  if (pMObj != NULL) {
9,148✔
1339
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
231✔
1340
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
231✔
1341
  }
1342

1343
  if (pQObj != NULL) {
9,148✔
1344
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
142✔
1345
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
142✔
1346
  }
1347

1348
  if (pBObj != NULL) {
9,148✔
1349
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
747✔
1350
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
747✔
1351
  }
1352

1353
  if (numOfVnodes > 0) {
8,401✔
1354
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
6,968✔
1355
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
6,968✔
1356
  }
1357

1358
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
8,401✔
1359

1360
  code = 0;
8,401✔
1361

1362
_OVER:
9,148✔
1363
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
9,148✔
1364
  mndTransDrop(pTrans);
9,148✔
1365
  sdbFreeRaw(pRaw);
9,148✔
1366
  TAOS_RETURN(code);
9,148✔
1367
}
1368

1369
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1370
  bool       isEmpty = false;
×
1371
  SMnodeObj *pMObj = NULL;
×
1372
  SQnodeObj *pQObj = NULL;
×
1373
  SSnodeObj *pSObj = NULL;
×
1374

1375
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1376
  if (pQObj) goto _OVER;
×
1377

1378
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1379
  if (pSObj) goto _OVER;
×
1380

1381
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1382
  if (pMObj) goto _OVER;
×
1383

1384
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1385
  if (numOfVnodes > 0) goto _OVER;
×
1386

1387
  isEmpty = true;
×
1388
_OVER:
×
1389
  mndReleaseMnode(pMnode, pMObj);
×
1390
  mndReleaseQnode(pMnode, pQObj);
×
1391
  mndReleaseSnode(pMnode, pSObj);
×
1392
  return isEmpty;
×
1393
}
1394

1395
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
10,025✔
1396
  SMnode       *pMnode = pReq->info.node;
10,025✔
1397
  int32_t       code = -1;
10,025✔
1398
  SDnodeObj    *pDnode = NULL;
10,025✔
1399
  SMnodeObj    *pMObj = NULL;
10,025✔
1400
  SQnodeObj    *pQObj = NULL;
10,025✔
1401
  SSnodeObj    *pSObj = NULL;
10,025✔
1402
  SBnodeObj    *pBObj = NULL;
10,025✔
1403
  SDropDnodeReq dropReq = {0};
10,025✔
1404
  int64_t       tss = taosGetTimestampMs();
10,025✔
1405

1406
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
10,025✔
1407

1408
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
10,025✔
1409
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1410
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MNODE), NULL, _OVER);
10,025✔
1411

1412
  bool force = dropReq.force;
10,025✔
1413
  if (dropReq.unsafe) {
10,025✔
1414
    force = true;
×
1415
  }
1416

1417
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
10,025✔
1418
  if (pDnode == NULL) {
10,025✔
1419
    int32_t err = terrno;
×
1420
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1421
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1422
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1423
    if (pDnode == NULL) {
×
1424
      code = err;
×
1425
      goto _OVER;
×
1426
    }
1427
  }
1428

1429
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
10,025✔
1430
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
10,025✔
1431
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
10,025✔
1432
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
10,025✔
1433
  if (pMObj != NULL) {
10,025✔
1434
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
860✔
1435
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
398✔
1436
      goto _OVER;
398✔
1437
    }
1438
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
462✔
1439
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
231✔
1440
      goto _OVER;
231✔
1441
    }
1442
  }
1443

1444
#ifdef USE_MOUNT
1445
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
9,396✔
1446
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1447
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1448
    goto _OVER;
×
1449
  }
1450
#endif
1451

1452
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
9,396✔
1453
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
9,396✔
1454

1455
  if (isonline && force) {
9,396✔
1456
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1457
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1458
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1459
    goto _OVER;
×
1460
  }
1461

1462
  mError("vnode num:%d", numOfVnodes);
9,396✔
1463

1464
  bool    vnodeOffline = false;
9,396✔
1465
  void   *pIter = NULL;
9,396✔
1466
  int32_t vgId = -1;
9,396✔
1467
  while (1) {
20,894✔
1468
    SVgObj *pVgroup = NULL;
30,290✔
1469
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
30,290✔
1470
    if (pIter == NULL) break;
30,290✔
1471

1472
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
63,068✔
1473
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
42,174✔
1474
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
42,174✔
1475
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
13,989✔
1476
          vgId = pVgroup->vgId;
×
1477
          vnodeOffline = true;
×
1478
          break;
×
1479
        }
1480
      }
1481
    }
1482

1483
    sdbRelease(pMnode->pSdb, pVgroup);
20,894✔
1484

1485
    if (vnodeOffline) {
20,894✔
1486
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1487
      break;
×
1488
    }
1489
  }
1490

1491
  if (vnodeOffline && !force) {
9,396✔
1492
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1493
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1494
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1495
    goto _OVER;
×
1496
  }
1497

1498
  if (!isonline && !force) {
9,396✔
1499
    code = TSDB_CODE_DNODE_OFFLINE;
248✔
1500
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
248✔
1501
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1502
    goto _OVER;
248✔
1503
  }
1504

1505
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
9,148✔
1506
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
9,148✔
1507

1508
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
9,148✔
1509
    char obj1[30] = {0};
9,148✔
1510
    (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
9,148✔
1511

1512
    int64_t tse = taosGetTimestampMs();
9,148✔
1513
    double  duration = (double)(tse - tss);
9,148✔
1514
    duration = duration / 1000;
9,148✔
1515
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
9,148✔
1516
  }
1517

1518
_OVER:
10,025✔
1519
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10,025✔
1520
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
1,624✔
1521
  }
1522

1523
  mndReleaseDnode(pMnode, pDnode);
10,025✔
1524
  mndReleaseMnode(pMnode, pMObj);
10,025✔
1525
  mndReleaseQnode(pMnode, pQObj);
10,025✔
1526
  mndReleaseBnode(pMnode, pBObj);
10,025✔
1527
  mndReleaseSnode(pMnode, pSObj);
10,025✔
1528
  tFreeSDropDnodeReq(&dropReq);
10,025✔
1529
  TAOS_RETURN(code);
10,025✔
1530
}
1531

1532
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
162✔
1533
  int32_t code = 0;
162✔
1534
  SMnode *pMnode = pReq->info.node;
162✔
1535
  SSdb   *pSdb = pMnode->pSdb;
162✔
1536
  void   *pIter = NULL;
162✔
1537
  int8_t  encrypting = 0;
162✔
1538

1539
  const STraceId *trace = &pReq->info.traceId;
162✔
1540

1541
  int32_t klen = strlen(pDcfgReq->value);
162✔
1542
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
162✔
1543
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1544
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1545
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1546
    goto _exit;
×
1547
  }
1548

1549
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
162✔
1550
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1551
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1552
    goto _exit;
×
1553
  }
1554

1555
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
162✔
1556
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1557
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1558
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1559
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1560
    goto _exit;
×
1561
  }
1562

1563
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
162✔
1564
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
162✔
1565
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
162✔
1566

1567
  while (1) {
162✔
1568
    SDnodeObj *pDnode = NULL;
324✔
1569
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
324✔
1570
    if (pIter == NULL) break;
324✔
1571
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
162✔
1572
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1573
             offlineReason[pDnode->offlineReason]);
1574
      sdbRelease(pSdb, pDnode);
×
1575
      continue;
×
1576
    }
1577

1578
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
162✔
1579
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
162✔
1580
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
162✔
1581
      void   *pBuf = rpcMallocCont(bufLen);
162✔
1582

1583
      if (pBuf != NULL) {
162✔
1584
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
162✔
1585
          code = bufLen;
×
1586
          sdbRelease(pSdb, pDnode);
×
1587
          goto _exit;
×
1588
        }
1589
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
162✔
1590
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
162✔
1591
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
162✔
1592
        }
1593
      }
1594
    }
1595

1596
    sdbRelease(pSdb, pDnode);
162✔
1597
  }
1598

1599
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
162✔
1600
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1601
  }
1602

1603
_exit:
162✔
1604
  if (code != 0) {
162✔
1605
    if (terrno == 0) terrno = code;
×
1606
  }
1607
  return code;
162✔
1608
}
1609

1610
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
162✔
1611
  int32_t code = 0;
162✔
1612

1613
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1614
  SMnode       *pMnode = pReq->info.node;
162✔
1615
  SMCfgDnodeReq cfgReq = {0};
162✔
1616
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
162✔
1617

1618
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE)) != 0) {
162✔
1619
    tFreeSMCfgDnodeReq(&cfgReq);
×
1620
    TAOS_RETURN(code);
×
1621
  }
1622
  const STraceId *trace = &pReq->info.traceId;
162✔
1623
  SDCfgDnodeReq   dcfgReq = {0};
162✔
1624
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
162✔
1625
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
162✔
1626
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
162✔
1627
    tFreeSMCfgDnodeReq(&cfgReq);
162✔
1628
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
162✔
1629
  } else {
1630
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1631
    tFreeSMCfgDnodeReq(&cfgReq);
×
1632
    TAOS_RETURN(code);
×
1633
  }
1634

1635
#else
1636
  TAOS_RETURN(code);
1637
#endif
1638
}
1639

1640
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
162✔
1641
  SMnode *pMnode = pRsp->info.node;
162✔
1642
  int16_t nSuccess = 0;
162✔
1643
  int16_t nFailed = 0;
162✔
1644

1645
  if (0 == pRsp->code) {
162✔
1646
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
162✔
1647
  } else {
1648
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1649
  }
1650

1651
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
162✔
1652
  bool    finished = nSuccess + nFailed >= nReq;
162✔
1653

1654
  if (finished) {
162✔
1655
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
162✔
1656
  }
1657

1658
  const STraceId *trace = &pRsp->info.traceId;
162✔
1659
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
162✔
1660
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1661

1662
  return 0;
162✔
1663
}
1664

1665
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,127✔
1666
  SMnode *pMnode = pReq->info.node;
1,127✔
1667
  int32_t totalRows = 0;
1,127✔
1668
  int32_t numOfRows = 0;
1,127✔
1669
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,127✔
1670
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,127✔
1671
  char   *pWrite = NULL;
1,127✔
1672
  int32_t cols = 0;
1,127✔
1673
  int32_t code = 0;
1,127✔
1674
  int32_t lino = 0;
1,127✔
1675

1676
  cfgOpts[totalRows] = "statusIntervalMs";
1,127✔
1677
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
1,127✔
1678
  totalRows++;
1,127✔
1679

1680
  cfgOpts[totalRows] = "timezone";
1,127✔
1681
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,127✔
1682
  totalRows++;
1,127✔
1683

1684
  cfgOpts[totalRows] = "locale";
1,127✔
1685
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,127✔
1686
  totalRows++;
1,127✔
1687

1688
  cfgOpts[totalRows] = "charset";
1,127✔
1689
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,127✔
1690
  totalRows++;
1,127✔
1691

1692
  cfgOpts[totalRows] = "monitor";
1,127✔
1693
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,127✔
1694
  totalRows++;
1,127✔
1695

1696
  cfgOpts[totalRows] = "monitorInterval";
1,127✔
1697
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,127✔
1698
  totalRows++;
1,127✔
1699

1700
  cfgOpts[totalRows] = "slowLogThreshold";
1,127✔
1701
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,127✔
1702
  totalRows++;
1,127✔
1703

1704
  cfgOpts[totalRows] = "slowLogMaxLen";
1,127✔
1705
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,127✔
1706
  totalRows++;
1,127✔
1707

1708
  char scopeStr[64] = {0};
1,127✔
1709
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,127✔
1710
  cfgOpts[totalRows] = "slowLogScope";
1,127✔
1711
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,127✔
1712
  totalRows++;
1,127✔
1713

1714
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,127✔
1715
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,127✔
1716

1717
  for (int32_t i = 0; i < totalRows; i++) {
11,270✔
1718
    cols = 0;
10,143✔
1719

1720
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
10,143✔
1721
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,143✔
1722
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
10,143✔
1723

1724
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
10,143✔
1725
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,143✔
1726
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
10,143✔
1727

1728
    numOfRows++;
10,143✔
1729
  }
1730

1731
_OVER:
1,127✔
1732
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,127✔
1733
  pShow->numOfRows += numOfRows;
1,127✔
1734
  return numOfRows;
1,127✔
1735
}
1736

1737
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1738

1739
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
708,009✔
1740
  SMnode    *pMnode = pReq->info.node;
708,009✔
1741
  SSdb      *pSdb = pMnode->pSdb;
708,009✔
1742
  int32_t    numOfRows = 0;
708,009✔
1743
  int32_t    cols = 0;
708,009✔
1744
  ESdbStatus objStatus = 0;
708,009✔
1745
  SDnodeObj *pDnode = NULL;
708,009✔
1746
  int64_t    curMs = taosGetTimestampMs();
708,009✔
1747
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
707,847✔
1748
  int32_t    code = 0;
708,009✔
1749
  int32_t    lino = 0;
708,009✔
1750

1751
  while (numOfRows < rows) {
2,525,689✔
1752
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
2,525,689✔
1753
    if (pShow->pIter == NULL) break;
2,525,689✔
1754
    bool online = mndIsDnodeOnline(pDnode, curMs);
1,817,680✔
1755

1756
    cols = 0;
1,817,680✔
1757

1758
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,817,680✔
1759
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
1,817,680✔
1760

1761
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
1,817,680✔
1762

1763
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,817,680✔
1764
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,817,680✔
1765

1766
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,817,680✔
1767
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
1,817,680✔
1768
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
1,817,680✔
1769

1770
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,817,680✔
1771
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
1,817,680✔
1772
                        &lino, _OVER);
1773

1774
    const char *status = "ready";
1,817,680✔
1775
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
1,817,680✔
1776
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
1,817,680✔
1777
    if (!online) {
1,817,680✔
1778
      if (objStatus == SDB_STATUS_CREATING)
198,556✔
1779
        status = "creating*";
×
1780
      else if (objStatus == SDB_STATUS_DROPPING)
198,556✔
1781
        status = "dropping*";
×
1782
      else
1783
        status = "offline";
198,556✔
1784
    }
1785

1786
    STR_TO_VARSTR(buf, status);
1,817,680✔
1787
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,817,680✔
1788
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,817,680✔
1789

1790
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,817,680✔
1791
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
1,817,680✔
1792
                        _OVER);
1793

1794
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,817,680✔
1795
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
1,817,680✔
1796
                        _OVER);
1797

1798
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
1,817,680✔
1799
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
1,817,680✔
1800

1801
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,817,680✔
1802
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
1,817,680✔
1803
    taosMemoryFreeClear(b);
1,817,680✔
1804

1805
#ifdef TD_ENTERPRISE
1806
    STR_TO_VARSTR(buf, pDnode->machineId);
1,817,680✔
1807
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,817,680✔
1808
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,817,680✔
1809
#endif
1810

1811
    numOfRows++;
1,817,680✔
1812
    sdbRelease(pSdb, pDnode);
1,817,680✔
1813
  }
1814

1815
_OVER:
707,847✔
1816
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
708,009✔
1817

1818
  pShow->numOfRows += numOfRows;
708,009✔
1819
  return numOfRows;
708,009✔
1820
}
1821

1822
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1823
  SSdb *pSdb = pMnode->pSdb;
×
1824
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1825
}
×
1826

1827
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1828
  int32_t    code = 0;
×
1829
  SDnodeObj *pObj = NULL;
×
1830
  void      *pIter = NULL;
×
1831
  SSdb      *pSdb = pMnode->pSdb;
×
1832
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1833
  if (fqdns == NULL) {
×
1834
    mError("failed to init fqdns array");
×
1835
    return NULL;
×
1836
  }
1837

1838
  while (1) {
×
1839
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1840
    if (pIter == NULL) break;
×
1841

1842
    char *fqdn = taosStrdup(pObj->fqdn);
×
1843
    if (fqdn == NULL) {
×
1844
      sdbRelease(pSdb, pObj);
×
1845
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
1846

1847
      code = terrno;
×
1848
      break;
×
1849
    }
1850

1851
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1852
      mError("failed to fqdn into array, but continue at this time");
×
1853
    }
1854
    sdbRelease(pSdb, pObj);
×
1855
  }
1856

1857
_error:
×
1858
  if (code != 0) {
×
1859
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
1860
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
1861
      taosMemoryFreeClear(pFqdn);
×
1862
    }
1863
    taosArrayDestroy(fqdns);
×
1864
    fqdns = NULL;
×
1865
  }
1866

1867
  return fqdns;
×
1868
}
1869

1870
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq) {
542,379✔
1871
  SMnode     *pMnode = pReq->info.node;
542,379✔
1872
  SKeySyncReq req = {0};
542,379✔
1873
  SKeySyncRsp rsp = {0};
542,379✔
1874
  int32_t     code = TSDB_CODE_SUCCESS;
542,379✔
1875

1876
  code = tDeserializeSKeySyncReq(pReq->pCont, pReq->contLen, &req);
542,379✔
1877
  if (code != 0) {
542,379✔
1878
    mError("failed to deserialize key sync req, since %s", tstrerror(code));
×
1879
    goto _OVER;
×
1880
  }
1881

1882
  mInfo("received key sync req from dnode:%d, keyVersion:%d", req.dnodeId, req.keyVersion);
542,379✔
1883

1884
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1885
  // Load mnode's encryption keys
1886
  char masterKeyFile[PATH_MAX] = {0};
542,379✔
1887
  snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
542,379✔
1888
           TD_DIRSEP);
1889
  char derivedKeyFile[PATH_MAX] = {0};
542,379✔
1890
  snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
542,379✔
1891
           TD_DIRSEP);
1892
  char    svrKey[129] = {0};
542,379✔
1893
  char    dbKey[129] = {0};
542,379✔
1894
  char    cfgKey[129] = {0};
542,379✔
1895
  char    metaKey[129] = {0};
542,379✔
1896
  char    dataKey[129] = {0};
542,379✔
1897
  int32_t algorithm = 0;
542,379✔
1898
  int32_t cfgAlgorithm = 0;
542,379✔
1899
  int32_t metaAlgorithm = 0;
542,379✔
1900
  int32_t fileVersion = 0;
542,379✔
1901
  int32_t keyVersion = 0;
542,379✔
1902
  int64_t createTime = 0;
542,379✔
1903
  int64_t svrKeyUpdateTime = 0;
542,379✔
1904
  int64_t dbKeyUpdateTime = 0;
542,379✔
1905

1906
  if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
542,379✔
1907
    keyVersion = tsEncryptKeyVersion;
1,474✔
1908
    tstrncpy(svrKey, tsSvrKey, 128);
1,474✔
1909
    tstrncpy(dbKey, tsDbKey, 128);
1,474✔
1910
    tstrncpy(cfgKey, tsCfgKey, 128);
1,474✔
1911
    tstrncpy(metaKey, tsMetaKey, 128);
1,474✔
1912
    tstrncpy(dataKey, tsDataKey, 128);
1,474✔
1913
    algorithm = tsEncryptAlgorithmType;
1,474✔
1914
    cfgAlgorithm = tsCfgAlgorithm;
1,474✔
1915
    metaAlgorithm = tsMetaAlgorithm;
1,474✔
1916
    fileVersion = tsEncryptFileVersion;
1,474✔
1917
    createTime = tsEncryptKeyCreateTime;
1,474✔
1918
    svrKeyUpdateTime = tsSvrKeyUpdateTime;
1,474✔
1919
    dbKeyUpdateTime = tsDbKeyUpdateTime;
1,474✔
1920
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_LOADED;
1,474✔
1921
  } else {
1922
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_DISABLED;
540,905✔
1923
  }
1924

1925
  // Check if dnode needs update
1926
  if (req.keyVersion != keyVersion) {
542,379✔
1927
    mInfo("dnode:%d key version mismatch, mnode:%d, dnode:%d, will send keys", req.dnodeId, keyVersion, req.keyVersion);
1,474✔
1928

1929
    rsp.keyVersion = keyVersion;
1,474✔
1930
    rsp.needUpdate = 1;
1,474✔
1931
    tstrncpy(rsp.svrKey, svrKey, sizeof(rsp.svrKey));
1,474✔
1932
    tstrncpy(rsp.dbKey, dbKey, sizeof(rsp.dbKey));
1,474✔
1933
    tstrncpy(rsp.cfgKey, cfgKey, sizeof(rsp.cfgKey));
1,474✔
1934
    tstrncpy(rsp.metaKey, metaKey, sizeof(rsp.metaKey));
1,474✔
1935
    tstrncpy(rsp.dataKey, dataKey, sizeof(rsp.dataKey));
1,474✔
1936
    rsp.algorithm = algorithm;
1,474✔
1937
    rsp.createTime = createTime;
1,474✔
1938
    rsp.svrKeyUpdateTime = svrKeyUpdateTime;
1,474✔
1939
    rsp.dbKeyUpdateTime = dbKeyUpdateTime;
1,474✔
1940
  } else {
1941
    mInfo("dnode:%d key version matches, version:%d", req.dnodeId, keyVersion);
540,905✔
1942
    rsp.keyVersion = keyVersion;
540,905✔
1943
    rsp.needUpdate = 0;
540,905✔
1944
  }
1945
#else
1946
  // Community edition - no encryption support
1947
  mWarn("enterprise features not enabled, key sync not supported");
1948
  rsp.keyVersion = 0;
1949
  rsp.needUpdate = 0;
1950
#endif
1951

1952
  int32_t contLen = tSerializeSKeySyncRsp(NULL, 0, &rsp);
542,379✔
1953
  if (contLen < 0) {
542,379✔
1954
    code = contLen;
×
1955
    goto _OVER;
×
1956
  }
1957

1958
  void *pHead = rpcMallocCont(contLen);
542,379✔
1959
  if (pHead == NULL) {
542,379✔
1960
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1961
    goto _OVER;
×
1962
  }
1963

1964
  contLen = tSerializeSKeySyncRsp(pHead, contLen, &rsp);
542,379✔
1965
  if (contLen < 0) {
542,379✔
1966
    rpcFreeCont(pHead);
×
1967
    code = contLen;
×
1968
    goto _OVER;
×
1969
  }
1970

1971
  pReq->info.rspLen = contLen;
542,379✔
1972
  pReq->info.rsp = pHead;
542,379✔
1973

1974
_OVER:
542,379✔
1975
  if (code != 0) {
542,379✔
1976
    mError("failed to process key sync req, since %s", tstrerror(code));
×
1977
  }
1978
  return code;
542,379✔
1979
}
1980

1981
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq) { return 0; }
×
1982

1983
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
1984
  if (p == NULL) return NULL;
×
1985

1986
  switch (type) {
×
1987
    case SDB_DNODE:
×
1988
      return (SDnodeObj *)p;
×
1989
    case SDB_QNODE:
×
1990
      return ((SQnodeObj *)p)->pDnode;
×
1991
    case SDB_SNODE:
×
1992
      return ((SSnodeObj *)p)->pDnode;
×
1993
    case SDB_BNODE:
×
1994
      return ((SBnodeObj *)p)->pDnode;
×
1995
    default:
×
1996
      break;
×
1997
  }
1998
  return NULL;
×
1999
}
2000
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
2001
  int32_t lino = 0;
×
2002
  SSdb   *pSdb = pMnode->pSdb;
×
2003
  void   *pIter = NULL;
×
2004
  int32_t code = 0;
×
2005

2006
  while (1) {
×
2007
    void *pObj = NULL;
×
2008
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
2009
    if (pIter == NULL) break;
×
2010

2011
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
2012
    if (pDnodeObj == NULL) {
×
2013
      mError("null dnode object for type:%d", type);
×
2014
      sdbRelease(pSdb, pObj);
×
2015
      continue;
×
2016
    }
2017

2018
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
2019
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
2020
      mError("failed to push addr into array");
×
2021
      sdbRelease(pSdb, pObj);
×
2022
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2023
    }
2024
    sdbRelease(pSdb, pObj);
×
2025
  }
2026

2027
_exit:
×
2028
  return code;
×
2029
}
2030

2031
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
2032
  int32_t lino = 0;
×
2033
  int32_t code = 0;
×
2034
  if (pMnode == NULL || pAddr == NULL) {
×
2035
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
2036
  }
2037

2038
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
2039
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2040

2041
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
2042
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2043

2044
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
2045
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2046

2047
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
2048
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2049

2050
_error:
×
2051
  return code;
×
2052
}
2053

2054
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
2055
  int32_t code = 0;
×
2056

2057
  SMnode *pMnode = pReq->info.node;
×
2058
  void   *pIter = NULL;
×
2059
  SSdb   *pSdb = pMnode->pSdb;
×
2060
  mInfo("start to reload dnode tls config");
×
2061

2062
  SMCfgDnodeReq req = {0};
×
2063
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
2064
    goto _OVER;
×
2065
  }
2066

2067
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
2068
    goto _OVER;
×
2069
  }
2070

2071
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
2072
  if (pAddr == NULL) {
×
2073
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
2074
  }
2075

2076
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
2077

2078
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
2079
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
2080
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
2081
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
2082
    if (code != 0) {
×
2083
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
2084
    }
2085
  }
2086

2087
_OVER:
×
2088
  tFreeSMCfgDnodeReq(&req);
×
2089
  taosArrayDestroy(pAddr);
×
2090
  return code;
×
2091
}
2092

2093
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
2094
  int32_t code = 0;
×
2095
  if (pRsp->code != 0) {
×
2096
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
2097
  } else {
2098
    mInfo("succeed to reload dnode tls config");
×
2099
  }
2100
  return code;
×
2101
}
2102

2103
static int32_t mndProcessAlterEncryptKeyReqImpl(SRpcMsg *pReq, SMAlterEncryptKeyReq *pAlterReq) {
×
2104
  int32_t code = 0;
×
2105
  SMnode *pMnode = pReq->info.node;
×
2106
  SSdb   *pSdb = pMnode->pSdb;
×
2107
  void   *pIter = NULL;
×
2108

2109
  const STraceId *trace = &pReq->info.traceId;
×
2110

2111
  // Validate key type
2112
  if (pAlterReq->keyType != 0 && pAlterReq->keyType != 1) {
×
2113
    mGError("msg:%p, failed to alter encrypt key since invalid key type:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", pReq,
×
2114
            pAlterReq->keyType);
2115
    return TSDB_CODE_INVALID_PARA;
×
2116
  }
2117

2118
  // Validate new key length
2119
  int32_t klen = strlen(pAlterReq->newKey);
×
2120
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
2121
    mGError("msg:%p, failed to alter encrypt key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
2122
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
2123
    return TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
2124
  }
2125

2126
  // Prepare SMAlterEncryptKeyReq for distribution to dnodes
2127
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
2128
  alterKeyReq.keyType = pAlterReq->keyType;
×
2129
  tstrncpy(alterKeyReq.newKey, pAlterReq->newKey, sizeof(alterKeyReq.newKey));
×
2130
  alterKeyReq.sqlLen = 0;
×
2131
  alterKeyReq.sql = NULL;
×
2132

2133
  // Send request to all online dnodes
2134
  while (1) {
×
2135
    SDnodeObj *pDnode = NULL;
×
2136
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2137
    if (pIter == NULL) break;
×
2138

2139
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
2140
      mGWarn("msg:%p, don't send alter encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2141
             offlineReason[pDnode->offlineReason]);
2142
      sdbRelease(pSdb, pDnode);
×
2143
      continue;
×
2144
    }
2145

2146
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2147
    int32_t bufLen = tSerializeSMAlterEncryptKeyReq(NULL, 0, &alterKeyReq);
×
2148
    void   *pBuf = rpcMallocCont(bufLen);
×
2149

2150
    if (pBuf != NULL) {
×
2151
      if ((bufLen = tSerializeSMAlterEncryptKeyReq(pBuf, bufLen, &alterKeyReq)) <= 0) {
×
2152
        code = bufLen;
×
2153
        sdbRelease(pSdb, pDnode);
×
2154
        goto _exit;
×
2155
      }
2156
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
2157
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2158
      if (ret != 0) {
×
2159
        mGError("msg:%p, failed to send alter encrypt_key req to dnode:%d, error:%s", pReq, pDnode->id, tstrerror(ret));
×
2160
      } else {
2161
        mGInfo("msg:%p, send alter encrypt_key req to dnode:%d, keyType:%d", pReq, pDnode->id, pAlterReq->keyType);
×
2162
      }
2163
    }
2164

2165
    sdbRelease(pSdb, pDnode);
×
2166
  }
2167

2168
  // Note: mnode runs on dnode, so the local keys will be updated by dnode itself
2169
  // when it receives the alter encrypt key request from mnode
2170
  mGInfo("msg:%p, successfully sent alter encrypt key request to all dnodes, keyType:%d", pReq, pAlterReq->keyType);
×
2171

2172
_exit:
×
2173
  if (code != 0) {
×
2174
    if (terrno == 0) terrno = code;
×
2175
  }
2176
  return code;
×
2177
}
2178

2179
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq) {
×
2180
  SMnode              *pMnode = pReq->info.node;
×
2181
  SMAlterEncryptKeyReq alterReq = {0};
×
2182
  int32_t              code = TSDB_CODE_SUCCESS;
×
2183
  int32_t              lino = 0;
×
2184

2185
  // Check privilege - only admin can alter encryption keys
2186
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2187
                  &lino, _OVER);
2188

2189
  // Deserialize request
2190
  code = tDeserializeSMAlterEncryptKeyReq(pReq->pCont, pReq->contLen, &alterReq);
×
2191
  if (code != 0) {
×
2192
    mError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
2193
    goto _OVER;
×
2194
  }
2195

2196
  mInfo("received alter encrypt key req, keyType:%d", alterReq.keyType);
×
2197

2198
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2199
  // Process and distribute to all dnodes
2200
  code = mndProcessAlterEncryptKeyReqImpl(pReq, &alterReq);
×
2201
  if (code == 0) {
×
2202
    // Audit log
2203
    auditRecord(pReq, pMnode->clusterId, "alterEncryptKey", "", alterReq.keyType == 0 ? "SVR_KEY" : "DB_KEY",
×
2204
                alterReq.sql, alterReq.sqlLen, 0, 0);
2205
  }
2206
#else
2207
  // Community edition - no encryption support
2208
  mError("encryption key management is only available in enterprise edition");
2209
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2210
#endif
2211

2212
_OVER:
×
2213
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2214
    mError("failed to alter encrypt key, keyType:%d, since %s", alterReq.keyType, tstrerror(code));
×
2215
  }
2216

2217
  tFreeSMAlterEncryptKeyReq(&alterReq);
×
2218
  TAOS_RETURN(code);
×
2219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc