• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4913

06 Jan 2026 01:30AM UTC coverage: 64.884% (-0.004%) from 64.888%
#4913

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

571 existing lines in 128 files now uncovered.

195016 of 300563 relevant lines covered (64.88%)

117540852.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.64
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndToken.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "taos_monitor.h"
34
#include "tconfig.h"
35
#include "tjson.h"
36
#include "tmisce.h"
37
#include "tunit.h"
38
#if defined(TD_ENTERPRISE)
39
#include "taoskInt.h"
40
#endif
41

42
#define TSDB_DNODE_VER_NUMBER   2
43
#define TSDB_DNODE_RESERVE_SIZE 40
44

45
static const char *offlineReason[] = {
46
    "",
47
    "status msg timeout",
48
    "status not received",
49
    "version not match",
50
    "dnodeId not match",
51
    "clusterId not match",
52
    "statusInterval not match",
53
    "timezone not match",
54
    "locale not match",
55
    "charset not match",
56
    "ttlChangeOnWrite not match",
57
    "enableWhiteList not match",
58
    "encryptionKey not match",
59
    "monitor not match",
60
    "monitor switch not match",
61
    "monitor interval not match",
62
    "monitor slow log threshold not match",
63
    "monitor slow log sql max len not match",
64
    "monitor slow log scope not match",
65
    "unknown",
66
};
67

68
enum {
69
  DND_ACTIVE_CODE,
70
  DND_CONN_ACTIVE_CODE,
71
};
72

73
enum {
74
  DND_CREATE,
75
  DND_ADD,
76
  DND_DROP,
77
};
78

79
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
80
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
81
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
82
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
83
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
84
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
85
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
86

87
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
89
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
90
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
91
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
92
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
93
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
94
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
95
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
96
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
97
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
98

99
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
100
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
101
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
102
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
103

104
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq);
105
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq);
106
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
107
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
108
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq);
109

110
#ifdef _GRANT
111
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
112
#else
113
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
114
#endif
115

116
int32_t mndInitDnode(SMnode *pMnode) {
401,894✔
117
  SSdbTable table = {
401,894✔
118
      .sdbType = SDB_DNODE,
119
      .keyType = SDB_KEY_INT32,
120
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
121
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
122
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
123
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
124
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
125
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
126
  };
127

128
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
401,894✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
401,894✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
401,894✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
401,894✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
401,894✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
401,894✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
401,894✔
135
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
401,894✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
401,894✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
401,894✔
138
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
401,894✔
139
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
401,894✔
140
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
401,894✔
141
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
401,894✔
142

143
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
401,894✔
144
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
401,894✔
145
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
401,894✔
146
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
401,894✔
147

148
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC, mndProcessKeySyncReq);
401,894✔
149
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC_RSP, mndProcessKeySyncRsp);
401,894✔
150
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_ENCRYPT_KEY, mndProcessAlterEncryptKeyReq);
401,894✔
151

152
  return sdbSetTable(pMnode->pSdb, table);
401,894✔
153
}
154

155
void mndCleanupDnode(SMnode *pMnode) {}
401,833✔
156

157
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
288,699✔
158
  int32_t  code = -1;
288,699✔
159
  SSdbRaw *pRaw = NULL;
288,699✔
160
  STrans  *pTrans = NULL;
288,699✔
161

162
  SDnodeObj dnodeObj = {0};
288,699✔
163
  dnodeObj.id = 1;
288,699✔
164
  dnodeObj.createdTime = taosGetTimestampMs();
288,699✔
165
  dnodeObj.updateTime = dnodeObj.createdTime;
288,699✔
166
  dnodeObj.port = tsServerPort;
288,699✔
167
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
288,699✔
168
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
288,699✔
169
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
288,699✔
170
  char *machineId = NULL;
288,699✔
171
  code = tGetMachineId(&machineId);
288,699✔
172
  if (machineId) {
288,699✔
173
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
288,699✔
174
    taosMemoryFreeClear(machineId);
288,699✔
175
  } else {
176
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
177
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
178
    goto _OVER;
×
179
#endif
180
  }
181

182
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
288,699✔
183
  if (pTrans == NULL) {
288,699✔
184
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
185
    if (terrno != 0) code = terrno;
×
186
    goto _OVER;
×
187
  }
188
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
288,699✔
189

190
  pRaw = mndDnodeActionEncode(&dnodeObj);
288,699✔
191
  if (pRaw == NULL) {
288,699✔
192
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
193
    if (terrno != 0) code = terrno;
×
194
    goto _OVER;
×
195
  }
196
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
288,699✔
197
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
288,699✔
198
  pRaw = NULL;
288,699✔
199

200
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
288,699✔
201
  code = 0;
288,699✔
202

203
_OVER:
288,699✔
204
  mndTransDrop(pTrans);
288,699✔
205
  sdbFreeRaw(pRaw);
288,699✔
206
  return code;
288,699✔
207
}
208

209
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,213,548✔
210
  int32_t code = 0;
2,213,548✔
211
  int32_t lino = 0;
2,213,548✔
212
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,213,548✔
213

214
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,213,548✔
215
  if (pRaw == NULL) goto _OVER;
2,213,548✔
216

217
  int32_t dataPos = 0;
2,213,548✔
218
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,213,548✔
219
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,213,548✔
220
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,213,548✔
221
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,213,548✔
222
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,213,548✔
223
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,213,548✔
224
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,213,548✔
225
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,213,548✔
226
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,213,548✔
227
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,213,548✔
228

229
  terrno = 0;
2,213,548✔
230

231
_OVER:
2,213,548✔
232
  if (terrno != 0) {
2,213,548✔
233
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
234
    sdbFreeRaw(pRaw);
×
235
    return NULL;
×
236
  }
237

238
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,213,548✔
239
  return pRaw;
2,213,548✔
240
}
241

242
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
1,486,541✔
243
  int32_t code = 0;
1,486,541✔
244
  int32_t lino = 0;
1,486,541✔
245
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,486,541✔
246
  SSdbRow   *pRow = NULL;
1,486,541✔
247
  SDnodeObj *pDnode = NULL;
1,486,541✔
248

249
  int8_t sver = 0;
1,486,541✔
250
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,486,541✔
251
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
1,486,541✔
252
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
253
    goto _OVER;
×
254
  }
255

256
  pRow = sdbAllocRow(sizeof(SDnodeObj));
1,486,541✔
257
  if (pRow == NULL) goto _OVER;
1,486,541✔
258

259
  pDnode = sdbGetRowObj(pRow);
1,486,541✔
260
  if (pDnode == NULL) goto _OVER;
1,486,541✔
261

262
  int32_t dataPos = 0;
1,486,541✔
263
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
1,486,541✔
264
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
1,486,541✔
265
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
1,486,541✔
266
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
1,486,541✔
267
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
1,486,541✔
268
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
1,486,541✔
269
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
1,486,541✔
270
  if (sver > 1) {
1,486,541✔
271
    int16_t keyLen = 0;
1,486,541✔
272
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,486,541✔
273
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,486,541✔
274
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,486,541✔
275
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,486,541✔
276
  }
277

278
  terrno = 0;
1,486,541✔
279
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
1,486,541✔
280
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
281
  }
282

283
_OVER:
1,486,541✔
284
  if (terrno != 0) {
1,486,541✔
285
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
286
    taosMemoryFreeClear(pRow);
×
287
    return NULL;
×
288
  }
289

290
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
1,486,541✔
291
  return pRow;
1,486,541✔
292
}
293

294
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
721,897✔
295
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
721,897✔
296
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
721,897✔
297

298
  char ep[TSDB_EP_LEN] = {0};
721,897✔
299
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
721,897✔
300
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
721,897✔
301
  return 0;
721,897✔
302
}
303

304
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
1,486,495✔
305
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
1,486,495✔
306
  return 0;
1,486,495✔
307
}
308

309
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
755,740✔
310
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
755,740✔
311
  pOld->updateTime = pNew->updateTime;
755,740✔
312
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
313
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
755,740✔
314
#endif
315
  return 0;
755,740✔
316
}
317

318
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
149,303,090✔
319
  SSdb      *pSdb = pMnode->pSdb;
149,303,090✔
320
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
149,303,199✔
321
  if (pDnode == NULL) {
149,302,592✔
322
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
255,746✔
323
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
67,927✔
324
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
187,819✔
325
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
326
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
187,819✔
327
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
187,819✔
328
    } else {
329
      terrno = TSDB_CODE_APP_ERROR;
×
330
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
331
    }
332
  }
333

334
  return pDnode;
149,301,691✔
335
}
336

337
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
150,529,367✔
338
  SSdb *pSdb = pMnode->pSdb;
150,529,367✔
339
  sdbRelease(pSdb, pDnode);
150,529,367✔
340
}
150,529,470✔
341

342
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
8,889,708✔
343
  SEpSet epSet = {0};
8,889,708✔
344
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
8,889,708✔
345
  return epSet;
8,889,708✔
346
}
347

348
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
763,826✔
349
  SEpSet     epSet = {0};
763,826✔
350
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
763,826✔
351
  if (!pDnode) return epSet;
763,826✔
352

353
  epSet = mndGetDnodeEpset(pDnode);
763,826✔
354

355
  mndReleaseDnode(pMnode, pDnode);
763,826✔
356
  return epSet;
763,826✔
357
}
358

359
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,320,106✔
360
  SSdb *pSdb = pMnode->pSdb;
1,320,106✔
361

362
  void *pIter = NULL;
1,320,106✔
363
  while (1) {
2,343,916✔
364
    SDnodeObj *pDnode = NULL;
3,664,022✔
365
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
3,664,022✔
366
    if (pIter == NULL) break;
3,664,022✔
367

368
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
2,909,960✔
369
      sdbCancelFetch(pSdb, pIter);
566,044✔
370
      return pDnode;
566,044✔
371
    }
372

373
    sdbRelease(pSdb, pDnode);
2,343,916✔
374
  }
375

376
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
754,062✔
377
  return NULL;
754,062✔
378
}
379

380
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
142,105✔
381
  SSdb *pSdb = pMnode->pSdb;
142,105✔
382

383
  void *pIter = NULL;
142,105✔
384
  while (1) {
150,032✔
385
    SDnodeObj *pDnode = NULL;
292,137✔
386
    ESdbStatus objStatus = 0;
292,137✔
387
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
292,137✔
388
    if (pIter == NULL) break;
292,137✔
389

390
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
292,137✔
391
      sdbCancelFetch(pSdb, pIter);
142,105✔
392
      return pDnode;
142,105✔
393
    }
394

395
    sdbRelease(pSdb, pDnode);
150,032✔
396
  }
397

398
  return NULL;
×
399
}
400

401
int32_t mndGetDnodeSize(SMnode *pMnode) {
70,946,526✔
402
  SSdb *pSdb = pMnode->pSdb;
70,946,526✔
403
  return sdbGetSize(pSdb, SDB_DNODE);
70,946,862✔
404
}
405

406
int32_t mndGetDbSize(SMnode *pMnode) {
×
407
  SSdb *pSdb = pMnode->pSdb;
×
408
  return sdbGetSize(pSdb, SDB_DB);
×
409
}
410

411
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
90,316,279✔
412
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
90,316,279✔
413
  if (interval > (int64_t)tsStatusTimeoutMs) {
90,313,102✔
414
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,126,744✔
415
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
40,303✔
416
    }
417
    return false;
2,127,283✔
418
  }
419
  return true;
88,186,358✔
420
}
421

422
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
1,915,388✔
423
  SSdb *pSdb = pMnode->pSdb;
1,915,388✔
424

425
  int32_t numOfEps = 0;
1,915,388✔
426
  void   *pIter = NULL;
1,915,388✔
427
  while (1) {
6,158,022✔
428
    SDnodeObj *pDnode = NULL;
8,073,410✔
429
    ESdbStatus objStatus = 0;
8,073,410✔
430
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
8,073,410✔
431
    if (pIter == NULL) break;
8,073,410✔
432

433
    SDnodeEp dnodeEp = {0};
6,158,022✔
434
    dnodeEp.id = pDnode->id;
6,158,022✔
435
    dnodeEp.ep.port = pDnode->port;
6,158,022✔
436
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
6,158,022✔
437
    sdbRelease(pSdb, pDnode);
6,158,022✔
438

439
    dnodeEp.isMnode = 0;
6,158,022✔
440
    if (mndIsMnode(pMnode, pDnode->id)) {
6,158,022✔
441
      dnodeEp.isMnode = 1;
2,403,861✔
442
    }
443
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
6,158,022✔
444
      mError("failed to put ep into array, but continue at this call");
×
445
    }
446
  }
447
}
1,915,388✔
448

449
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
24,216,574✔
450
  SSdb   *pSdb = pMnode->pSdb;
24,216,574✔
451
  int32_t code = 0;
24,216,574✔
452

453
  int32_t numOfEps = 0;
24,216,574✔
454
  void   *pIter = NULL;
24,216,574✔
455
  while (1) {
102,831,078✔
456
    SDnodeObj *pDnode = NULL;
127,047,652✔
457
    ESdbStatus objStatus = 0;
127,047,652✔
458
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
127,047,652✔
459
    if (pIter == NULL) break;
127,047,652✔
460

461
    SDnodeInfo dInfo;
102,830,790✔
462
    dInfo.id = pDnode->id;
102,831,078✔
463
    dInfo.ep.port = pDnode->port;
102,831,078✔
464
    dInfo.offlineReason = pDnode->offlineReason;
102,831,078✔
465
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
102,831,078✔
466
    sdbRelease(pSdb, pDnode);
102,831,078✔
467
    if (mndIsMnode(pMnode, pDnode->id)) {
102,831,078✔
468
      dInfo.isMnode = 1;
28,888,017✔
469
    } else {
470
      dInfo.isMnode = 0;
73,943,061✔
471
    }
472

473
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
102,831,078✔
474
      code = terrno;
×
475
      sdbCancelFetch(pSdb, pIter);
×
476
      break;
×
477
    }
478
  }
479
  TAOS_RETURN(code);
24,216,574✔
480
}
481

482
#define CHECK_MONITOR_PARA(para, err)                                                                    \
483
  if (pCfg->monitorParas.para != para) {                                                                 \
484
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
485
    terrno = err;                                                                                        \
486
    return err;                                                                                          \
487
  }
488

489
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
×
490
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
×
491
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
×
492
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
×
493
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
×
494
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
×
495

496
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
×
497
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
498
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
499
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
500
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
501
  }
502

503
  /*
504
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
505
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
506
           tsStatusIntervalMs);
507
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
508
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
509
  }
510
  */
511

512
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
×
513
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
514
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
515
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
516
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
517
  }
518

519
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
×
520
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
521
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
522
    return DND_REASON_LOCALE_NOT_MATCH;
×
523
  }
524

525
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
×
526
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
527
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
528
    return DND_REASON_CHARSET_NOT_MATCH;
×
529
  }
530

531
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
×
532
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
533
           tsTtlChangeOnWrite);
534
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
535
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
536
  }
537
  int8_t enable = tsEnableWhiteList ? 1 : 0;
×
538
  if (pCfg->enableWhiteList != enable) {
×
539
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
540
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
541
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
542
  }
543

544
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
×
545
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
×
546
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
547
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
548
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
549
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
550
  }
551

552
  return DND_REASON_ONLINE;
×
553
}
554

555
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
66,165✔
556
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
66,165✔
557
    return 0.0;
532✔
558
  }
559

560
  int64_t deltaCount = currentCount - lastCount;
65,633✔
561
  int64_t deltaMs = currentTimeMs - lastTimeMs;
65,633✔
562
  double  rate = (double)deltaCount / (double)deltaMs;
65,633✔
563
  return rate;
65,633✔
564
}
565

566
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
111,107,383✔
567
  bool stateChanged = false;
111,107,383✔
568
  bool roleChanged = pGid->syncState != pVload->syncState ||
111,117,283✔
569
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
217,717,400✔
570
                     pGid->roleTimeMs != pVload->roleTimeMs;
106,610,017✔
571

572
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
111,107,383✔
573
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
144,828✔
574
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
75,081✔
575
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
69,747✔
576
      int64_t currentTimeMs = taosGetTimestampMs();
66,165✔
577
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
66,165✔
578
                                          pGid->lastSyncAppliedIndexUpdateTime);
579

580
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
66,165✔
581
    }
582
  }
583

584
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
111,107,383✔
585
  pGid->syncCommitIndex = pVload->syncCommitIndex;
111,107,383✔
586
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
111,107,383✔
587
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
111,107,383✔
588
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
111,107,383✔
589
      pGid->startTimeMs != pVload->startTimeMs) {
106,144,353✔
590
    mInfo(
4,963,030✔
591
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
592
        "canRead:%d, dnode:%d",
593
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
594
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
595
    pGid->syncState = pVload->syncState;
4,963,030✔
596
    pGid->syncTerm = pVload->syncTerm;
4,963,030✔
597
    pGid->syncRestore = pVload->syncRestore;
4,963,030✔
598
    pGid->syncCanRead = pVload->syncCanRead;
4,963,030✔
599
    pGid->startTimeMs = pVload->startTimeMs;
4,963,030✔
600
    pGid->roleTimeMs = pVload->roleTimeMs;
4,963,030✔
601
    stateChanged = true;
4,963,030✔
602
  }
603
  return stateChanged;
111,107,383✔
604
}
605

606
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
24,349,155✔
607
  bool stateChanged = false;
24,349,155✔
608
  bool roleChanged = pObj->syncState != pMload->syncState ||
24,354,993✔
609
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
48,276,260✔
610
                     pObj->roleTimeMs != pMload->roleTimeMs;
23,927,105✔
611
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
24,349,155✔
612
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
426,688✔
613
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
614
          pObj->syncTerm, pMload->syncTerm);
615
    pObj->syncState = pMload->syncState;
426,688✔
616
    pObj->syncTerm = pMload->syncTerm;
426,688✔
617
    pObj->syncRestore = pMload->syncRestore;
426,688✔
618
    pObj->roleTimeMs = pMload->roleTimeMs;
426,688✔
619
    stateChanged = true;
426,688✔
620
  }
621
  return stateChanged;
24,349,155✔
622
}
623

624
extern char   *tsMonFwUri;
625
extern char   *tsMonSlowLogUri;
UNCOV
626
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
×
UNCOV
627
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
628
  SStatisReq statisReq = {0};
×
UNCOV
629
  int32_t    code = -1;
×
630

UNCOV
631
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
×
632

UNCOV
633
  if (tsMonitorLogProtocol) {
×
UNCOV
634
    mInfo("process statis req,\n %s", statisReq.pCont);
×
635
  }
636

UNCOV
637
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
×
UNCOV
638
    monSendContent(statisReq.pCont, tsMonFwUri);
×
639
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
640
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
641
  }
642

UNCOV
643
  tFreeSStatisReq(&statisReq);
×
UNCOV
644
  return 0;
×
645
}
646

647
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
×
648
  mTrace("process audit req:%p", pReq);
×
649
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
650
    SMnode   *pMnode = pReq->info.node;
×
651
    SAuditReq auditReq = {0};
×
652

653
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
654

655
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
×
656

657
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
×
658
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
659

660
    tFreeSAuditReq(&auditReq);
×
661
  }
662
  return 0;
×
663
}
664

665
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
666
  mTrace("process audit req:%p", pReq);
×
667
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
668
    SMnode        *pMnode = pReq->info.node;
×
669
    SBatchAuditReq auditReq = {0};
×
670

671
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
672

673
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
674

675
    for (int32_t i = 0; i < nAudit; ++i) {
×
676
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
677
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
678

679
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
680
                     audit->duration, audit->affectedRows);
681
    }
682

683
    tFreeSBatchAuditReq(&auditReq);
×
684
  }
685
  return 0;
×
686
}
687

688
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
617,223✔
689
  int32_t       code = 0, lino = 0;
617,223✔
690
  SDnodeInfoReq infoReq = {0};
617,223✔
691
  int32_t       contLen = 0;
617,223✔
692
  void         *pReq = NULL;
617,223✔
693

694
  infoReq.dnodeId = pDnode->id;
617,223✔
695
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
617,223✔
696

697
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
617,223✔
698
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
699
  }
700
  pReq = rpcMallocCont(contLen);
617,223✔
701
  if (pReq == NULL) {
617,223✔
702
    TAOS_RETURN(terrno);
×
703
  }
704

705
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
617,223✔
706
    code = contLen;
×
707
    goto _exit;
×
708
  }
709

710
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
617,223✔
711
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
617,223✔
712
_exit:
617,223✔
713
  if (code < 0) {
617,223✔
714
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
715
  }
716
  TAOS_RETURN(code);
617,223✔
717
}
718

719
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
617,121✔
720
  int32_t       code = 0, lino = 0;
617,121✔
721
  SMnode       *pMnode = pReq->info.node;
617,121✔
722
  SDnodeInfoReq infoReq = {0};
617,121✔
723
  SDnodeObj    *pDnode = NULL;
617,121✔
724
  STrans       *pTrans = NULL;
617,121✔
725
  SSdbRaw      *pCommitRaw = NULL;
617,121✔
726

727
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
617,121✔
728

729
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
617,121✔
730
  if (pDnode == NULL) {
617,121✔
731
    TAOS_CHECK_EXIT(terrno);
×
732
  }
733

734
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
617,121✔
735
  if (pTrans == NULL) {
617,121✔
736
    TAOS_CHECK_EXIT(terrno);
×
737
  }
738

739
  pDnode->updateTime = taosGetTimestampMs();
617,121✔
740

741
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
617,121✔
742
    TAOS_CHECK_EXIT(terrno);
×
743
  }
744
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
617,121✔
745
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
746
    TAOS_CHECK_EXIT(code);
×
747
  }
748
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
617,121✔
749
  pCommitRaw = NULL;
617,121✔
750

751
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
617,121✔
752
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
753
    TAOS_CHECK_EXIT(code);
×
754
  }
755

756
_exit:
617,121✔
757
  mndReleaseDnode(pMnode, pDnode);
617,121✔
758
  if (code != 0) {
617,121✔
759
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
760
  }
761
  mndTransDrop(pTrans);
617,121✔
762
  sdbFreeRaw(pCommitRaw);
617,121✔
763
  TAOS_RETURN(code);
617,121✔
764
}
765

766
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
40,734,873✔
767
  SMnode    *pMnode = pReq->info.node;
40,734,873✔
768
  SStatusReq statusReq = {0};
40,734,873✔
769
  SDnodeObj *pDnode = NULL;
40,734,873✔
770
  int32_t    code = -1;
40,734,873✔
771

772
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
40,734,873✔
773

774
  int64_t clusterid = mndGetClusterId(pMnode);
40,734,873✔
775
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
40,734,873✔
776
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
777
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
778
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
779
    goto _OVER;
×
780
  }
781

782
  if (statusReq.dnodeId == 0) {
40,734,873✔
783
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
970,522✔
784
    if (pDnode == NULL) {
970,522✔
785
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
404,982✔
786
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
404,982✔
787
      if (terrno != 0) code = terrno;
404,982✔
788
      goto _OVER;
404,982✔
789
    }
790
  } else {
791
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
39,764,351✔
792
    if (pDnode == NULL) {
39,764,351✔
793
      int32_t err = terrno;
201,365✔
794
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
201,365✔
795
      if (pDnode != NULL) {
201,365✔
796
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
504✔
797
        terrno = err;
504✔
798
        goto _OVER;
504✔
799
      }
800

801
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
200,861✔
802
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
200,861✔
803
        terrno = err;
58,756✔
804
        goto _OVER;
58,756✔
805
      } else {
806
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
142,105✔
807
        if (pDnode == NULL) goto _OVER;
142,105✔
808
      }
809
    }
810
  }
811

812
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
40,270,631✔
813
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
40,270,631✔
814

815
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
40,270,631✔
816
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
40,270,631✔
817
  int64_t curMs = taosGetTimestampMs();
40,270,631✔
818
  bool    online = mndIsDnodeOnline(pDnode, curMs);
40,270,631✔
819
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
40,270,631✔
820
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
40,270,631✔
821
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
40,270,631✔
822
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
40,270,631✔
823
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
40,270,631✔
824
  bool    analVerChanged = (analVer != statusReq.analVer);
40,270,631✔
825
  bool    auditDBChanged = false;
40,270,631✔
826
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
40,270,631✔
827
  bool    auditTokenChanged = false;
40,270,631✔
828
  char    auditToken[TSDB_TOKEN_LEN] = {0};
40,270,631✔
829

830
  if (tsAuditUseToken) {
40,270,631✔
831
    SDbObj *pDb = mndAcquireAuditDb(pMnode);
40,270,631✔
832
    if (pDb != NULL) {
40,270,631✔
833
      SName name = {0};
×
834
      if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0)
×
835
        mError("db:%s, failed to parse db name", pDb->name);
×
836
      tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN);
×
837
      mndReleaseDb(pMnode, pDb);
×
838
    }
839
    if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
40,270,631✔
840

841
    char    auditUser[TSDB_USER_LEN] = {0};
40,270,631✔
842
    int32_t ret = 0;
40,270,631✔
843
    if ((ret = mndGetAuditUser(pMnode, auditUser)) != 0) {
40,270,631✔
844
      mTrace("dnode:%d, failed to get audit user since %s", pDnode->id, tstrerror(ret));
×
845
    } else {
846
      mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser);
40,270,631✔
847
      int32_t ret = 0;
40,270,631✔
848
      if ((ret = mndGetUserActiveToken("audit", auditToken)) != 0) {
40,270,631✔
849
        mTrace("dnode:%d, failed to get audit user active token, token:%s, since %s", pDnode->id, auditToken,
40,270,631✔
850
               tstrerror(ret));
851
      } else {
852
        mTrace("dnode:%d, get audit user active token:%s", pDnode->id, auditToken);
×
853
        if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditTokenChanged = true;
×
854
      }
855
    }
856
  } 
857

858
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
39,656,853✔
859
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
38,355,484✔
860
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditTokenChanged;
79,927,484✔
861
  const STraceId *trace = &pReq->info.traceId;
40,270,631✔
862
  char            timestamp[TD_TIME_STR_LEN] = {0};
40,270,631✔
863
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
40,270,631✔
864
  mGTrace(
40,270,631✔
865
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
866
      "timestamp:%s",
867
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
868

869
  if (reboot) {
40,270,631✔
870
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
635,097✔
871
  }
872

873
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
40,270,631✔
874
  if (labs(delta) >= tsTimestampDeltaLimit) {
40,270,631✔
875
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
876
    code = terrno;
×
877

878
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
879
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
880
           tstrerror(code), tsTimestampDeltaLimit);
881
    goto _OVER;
×
882
  }
883
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
151,841,729✔
884
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
111,571,098✔
885

886
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
111,571,098✔
887
    if (pVgroup != NULL) {
111,571,098✔
888
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
111,160,313✔
889
        pVgroup->cacheUsage = pVload->cacheUsage;
84,167,724✔
890
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
84,167,724✔
891
        pVgroup->numOfTables = pVload->numOfTables;
84,167,724✔
892
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
84,167,724✔
893
        pVgroup->totalStorage = pVload->totalStorage;
84,167,724✔
894
        pVgroup->compStorage = pVload->compStorage;
84,167,724✔
895
        pVgroup->pointsWritten = pVload->pointsWritten;
84,167,724✔
896
      }
897
      bool stateChanged = false;
111,160,313✔
898
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
148,805,513✔
899
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
148,752,583✔
900
        if (pGid->dnodeId == statusReq.dnodeId) {
148,752,583✔
901
          if (pVload->startTimeMs == 0) {
111,107,383✔
902
            pVload->startTimeMs = statusReq.rebootTime;
×
903
          }
904
          if (pVload->roleTimeMs == 0) {
111,107,383✔
905
            pVload->roleTimeMs = statusReq.rebootTime;
×
906
          }
907
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
111,107,383✔
908
          break;
111,107,383✔
909
        }
910
      }
911
      if (stateChanged) {
111,160,313✔
912
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,963,030✔
913
        if (pDb != NULL && pDb->stateTs != curMs) {
4,963,030✔
914
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,458,144✔
915
                pDb->stateTs, curMs);
916
          pDb->stateTs = curMs;
3,458,144✔
917
        }
918
        mndReleaseDb(pMnode, pDb);
4,963,030✔
919
      }
920
    }
921

922
    mndReleaseVgroup(pMnode, pVgroup);
111,571,098✔
923
  }
924

925
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
40,270,631✔
926
  if (pObj != NULL) {
40,270,631✔
927
    if (statusReq.mload.roleTimeMs == 0) {
24,349,155✔
928
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
212,639✔
929
    }
930
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
24,349,155✔
931
    mndReleaseMnode(pMnode, pObj);
24,349,155✔
932
  }
933

934
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
40,270,631✔
935
  if (pQnode != NULL) {
40,270,631✔
936
    pQnode->load = statusReq.qload;
101,383✔
937
    mndReleaseQnode(pMnode, pQnode);
101,383✔
938
  }
939

940
  if (needCheck) {
40,270,631✔
941
    if (statusReq.sver != tsVersion) {
1,915,388✔
942
      if (pDnode != NULL) {
×
943
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
944
      }
945
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
946
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
947
      goto _OVER;
×
948
    }
949

950
    if (statusReq.dnodeId == 0) {
1,915,388✔
951
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
565,540✔
952
    } else {
953
      if (statusReq.clusterId != pMnode->clusterId) {
1,349,848✔
954
        if (pDnode != NULL) {
×
955
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
956
        }
957
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
958
               pMnode->clusterId);
959
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
960
        goto _OVER;
×
961
      }
962
    }
963

964
    // Verify whether the cluster parameters are consistent when status change from offline to ready
965
    // pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
966
    // if (pDnode->offlineReason != 0) {
967
    //   mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
968
    //   if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
969
    //   goto _OVER;
970
    // }
971

972
    if (!online) {
1,915,388✔
973
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
613,778✔
974
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
975
    } else {
976
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
1,301,610✔
977
            statusReq.dnodeVer, dnodeVer, reboot);
978
    }
979

980
    pDnode->rebootTime = statusReq.rebootTime;
1,915,388✔
981
    pDnode->numOfCores = statusReq.numOfCores;
1,915,388✔
982
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
1,915,388✔
983
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
1,915,388✔
984
    pDnode->memAvail = statusReq.memAvail;
1,915,388✔
985
    pDnode->memTotal = statusReq.memTotal;
1,915,388✔
986
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
1,915,388✔
987
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
1,915,388✔
988
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
1,915,388✔
989
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
617,223✔
990
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
617,223✔
991
        goto _OVER;
×
992
      }
993
    }
994

995
    SStatusRsp statusRsp = {0};
1,915,388✔
996
    statusRsp.statusSeq++;
1,915,388✔
997
    statusRsp.analVer = analVer;
1,915,388✔
998
    statusRsp.dnodeVer = dnodeVer;
1,915,388✔
999
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
1,915,388✔
1000
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
1,915,388✔
1001
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
1,915,388✔
1002
    if (statusRsp.pDnodeEps == NULL) {
1,915,388✔
1003
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1004
      goto _OVER;
×
1005
    }
1006

1007
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
1,915,388✔
1008
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
1,915,388✔
1009
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
1,915,388✔
1010

1011
    if (auditDB[0] != '\0') {
1,915,388✔
1012
      mInfo("dnode:%d, set audit db %s in process status rsp", statusReq.dnodeId, auditDB);
×
1013
      tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
×
1014
    }
1015
    if (auditToken[0] != '\0') {
1,915,388✔
1016
      mInfo("dnode:%d, set audit token %s in process status rsp", statusReq.dnodeId, auditToken);
×
1017
      tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN);
×
1018
    }
1019

1020
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
1,915,388✔
1021
    void   *pHead = rpcMallocCont(contLen);
1,915,388✔
1022
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
1,915,388✔
1023
    taosArrayDestroy(statusRsp.pDnodeEps);
1,915,388✔
1024
    if (contLen < 0) {
1,915,388✔
1025
      code = contLen;
×
1026
      goto _OVER;
×
1027
    }
1028

1029
    pReq->info.rspLen = contLen;
1,915,388✔
1030
    pReq->info.rsp = pHead;
1,915,388✔
1031
  }
1032

1033
  pDnode->accessTimes++;
40,270,631✔
1034
  pDnode->lastAccessTime = curMs;
40,270,631✔
1035
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
40,270,631✔
1036
    pDnode->offlineReason = DND_REASON_ONLINE;
613,778✔
1037
  }
1038
  code = 0;
40,270,631✔
1039

1040
_OVER:
40,734,873✔
1041
  mndReleaseDnode(pMnode, pDnode);
40,734,873✔
1042
  taosArrayDestroy(statusReq.pVloads);
40,734,873✔
1043
  if (code != 0) {
40,734,873✔
1044
    mError("dnode:%d, failed to process status req since %s", statusReq.dnodeId, tstrerror(code));
404,982✔
1045
    return code;
404,982✔
1046
  }
1047

1048
  return mndUpdClusterInfo(pReq);
40,329,891✔
1049
}
1050

1051
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
1052
  SMnode    *pMnode = pReq->info.node;
×
1053
  SNotifyReq notifyReq = {0};
×
1054
  int32_t    code = 0;
×
1055

1056
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
1057
    terrno = code;
×
1058
    goto _OVER;
×
1059
  }
1060

1061
  int64_t clusterid = mndGetClusterId(pMnode);
×
1062
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
1063
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
1064
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1065
          notifyReq.clusterId, clusterid, tstrerror(code));
1066
    goto _OVER;
×
1067
  }
1068

1069
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
1070
  for (int32_t v = 0; v < nVgroup; ++v) {
×
1071
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1072

1073
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
1074
    if (pVgroup != NULL) {
×
1075
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
1076
      mndReleaseVgroup(pMnode, pVgroup);
×
1077
    }
1078
  }
1079
  code = mndUpdClusterInfo(pReq);
×
1080
_OVER:
×
1081
  tFreeSNotifyReq(&notifyReq);
×
1082
  return code;
×
1083
}
1084

1085
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
148,219✔
1086
  int32_t  code = -1;
148,219✔
1087
  SSdbRaw *pRaw = NULL;
148,219✔
1088
  STrans  *pTrans = NULL;
148,219✔
1089

1090
  SDnodeObj dnodeObj = {0};
148,219✔
1091
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
148,219✔
1092
  dnodeObj.createdTime = taosGetTimestampMs();
148,219✔
1093
  dnodeObj.updateTime = dnodeObj.createdTime;
148,219✔
1094
  dnodeObj.port = pCreate->port;
148,219✔
1095
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
148,219✔
1096
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
148,219✔
1097

1098
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
148,219✔
1099
  if (pTrans == NULL) {
148,219✔
1100
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1101
    if (terrno != 0) code = terrno;
×
1102
    goto _OVER;
×
1103
  }
1104
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
148,219✔
1105
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
148,219✔
1106

1107
  pRaw = mndDnodeActionEncode(&dnodeObj);
148,219✔
1108
  if (pRaw == NULL) {
148,219✔
1109
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1110
    if (terrno != 0) code = terrno;
×
1111
    goto _OVER;
×
1112
  }
1113
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
148,219✔
1114
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
148,219✔
1115
  pRaw = NULL;
148,219✔
1116

1117
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
148,219✔
1118
  code = 0;
148,219✔
1119

1120
_OVER:
148,219✔
1121
  mndTransDrop(pTrans);
148,219✔
1122
  sdbFreeRaw(pRaw);
148,219✔
1123
  return code;
148,219✔
1124
}
1125

1126
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
31,555✔
1127
  SMnode       *pMnode = pReq->info.node;
31,555✔
1128
  SSdb         *pSdb = pMnode->pSdb;
31,555✔
1129
  SDnodeObj    *pObj = NULL;
31,555✔
1130
  void         *pIter = NULL;
31,555✔
1131
  SDnodeListRsp rsp = {0};
31,555✔
1132
  int32_t       code = -1;
31,555✔
1133

1134
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
31,555✔
1135
  if (NULL == rsp.dnodeList) {
31,555✔
1136
    mError("failed to alloc epSet while process dnode list req");
×
1137
    code = terrno;
×
1138
    goto _OVER;
×
1139
  }
1140

1141
  while (1) {
61,638✔
1142
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
93,193✔
1143
    if (pIter == NULL) break;
93,193✔
1144

1145
    SDNodeAddr dnodeAddr = {0};
61,638✔
1146
    dnodeAddr.nodeId = pObj->id;
61,638✔
1147
    dnodeAddr.epSet.numOfEps = 1;
61,638✔
1148
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
61,638✔
1149
    dnodeAddr.epSet.eps[0].port = pObj->port;
61,638✔
1150

1151
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
123,276✔
1152
      if (terrno != 0) code = terrno;
×
1153
      sdbRelease(pSdb, pObj);
×
1154
      sdbCancelFetch(pSdb, pIter);
×
1155
      goto _OVER;
×
1156
    }
1157

1158
    sdbRelease(pSdb, pObj);
61,638✔
1159
  }
1160

1161
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
31,555✔
1162
  void   *pRsp = rpcMallocCont(rspLen);
31,555✔
1163
  if (pRsp == NULL) {
31,555✔
1164
    code = terrno;
×
1165
    goto _OVER;
×
1166
  }
1167

1168
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
31,555✔
1169
    code = rspLen;
×
1170
    goto _OVER;
×
1171
  }
1172

1173
  pReq->info.rspLen = rspLen;
31,555✔
1174
  pReq->info.rsp = pRsp;
31,555✔
1175
  code = 0;
31,555✔
1176

1177
_OVER:
31,555✔
1178

1179
  if (code != 0) {
31,555✔
1180
    mError("failed to get dnode list since %s", tstrerror(code));
×
1181
  }
1182

1183
  tFreeSDnodeListRsp(&rsp);
31,555✔
1184

1185
  TAOS_RETURN(code);
31,555✔
1186
}
1187

1188
void getSlowLogScopeString(int32_t scope, char *result) {
1,127✔
1189
  if (scope == SLOW_LOG_TYPE_NULL) {
1,127✔
1190
    (void)strncat(result, "NONE", 64);
×
1191
    return;
×
1192
  }
1193
  while (scope > 0) {
2,254✔
1194
    if (scope & SLOW_LOG_TYPE_QUERY) {
1,127✔
1195
      (void)strncat(result, "QUERY", 64);
1,127✔
1196
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,127✔
1197
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1198
      (void)strncat(result, "INSERT", 64);
×
1199
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1200
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1201
      (void)strncat(result, "OTHERS", 64);
×
1202
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1203
    } else {
1204
      (void)printf("invalid slow log scope:%d", scope);
×
1205
      return;
×
1206
    }
1207

1208
    if (scope > 0) {
1,127✔
1209
      (void)strncat(result, "|", 64);
×
1210
    }
1211
  }
1212
}
1213

1214
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
148,219✔
1215
  SMnode         *pMnode = pReq->info.node;
148,219✔
1216
  int32_t         code = -1;
148,219✔
1217
  SDnodeObj      *pDnode = NULL;
148,219✔
1218
  SCreateDnodeReq createReq = {0};
148,219✔
1219
  int32_t         lino = 0;
148,219✔
1220
  int64_t         tss = taosGetTimestampMs();
148,219✔
1221

1222
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
148,219✔
1223
    goto _OVER;
×
1224
  }
1225

1226
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
148,219✔
1227
  TAOS_CHECK_GOTO(code, &lino, _OVER);
148,219✔
1228

1229
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
148,219✔
1230
  code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_DNODE);
148,219✔
1231
  TAOS_CHECK_GOTO(code, &lino, _OVER);
148,219✔
1232

1233
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
148,219✔
1234
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1235
    goto _OVER;
×
1236
  }
1237
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1238
  // if (code != 0) {
1239
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1240
  //          tstrerror(code));
1241
  //   goto _OVER;
1242
  // }
1243

1244
  char ep[TSDB_EP_LEN];
148,219✔
1245
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
148,219✔
1246
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
148,219✔
1247
  if (pDnode != NULL) {
148,219✔
1248
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1249
    goto _OVER;
×
1250
  }
1251

1252
  code = mndCreateDnode(pMnode, pReq, &createReq);
148,219✔
1253
  if (code == 0) {
148,219✔
1254
    code = TSDB_CODE_ACTION_IN_PROGRESS;
148,219✔
1255
    tsGrantHBInterval = 5;
148,219✔
1256
  }
1257

1258
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
148,219✔
1259
    char obj[200] = {0};
148,219✔
1260
    (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
148,219✔
1261

1262
    int64_t tse = taosGetTimestampMs();
148,219✔
1263
    double  duration = (double)(tse - tss);
148,219✔
1264
    duration = duration / 1000;
148,219✔
1265
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
148,219✔
1266
  }
1267

1268
_OVER:
148,219✔
1269
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
148,219✔
1270
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1271
  }
1272

1273
  mndReleaseDnode(pMnode, pDnode);
148,219✔
1274
  tFreeSCreateDnodeReq(&createReq);
148,219✔
1275
  TAOS_RETURN(code);
148,219✔
1276
}
1277

1278
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1279

1280
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
1,947✔
1281

1282
#ifndef TD_ENTERPRISE
1283
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1284
#endif
1285

1286
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
9,340✔
1287
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1288
  int32_t  code = -1;
9,340✔
1289
  SSdbRaw *pRaw = NULL;
9,340✔
1290
  STrans  *pTrans = NULL;
9,340✔
1291
  int32_t  lino = 0;
9,340✔
1292

1293
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
9,340✔
1294
  if (pTrans == NULL) {
9,340✔
1295
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1296
    if (terrno != 0) code = terrno;
×
1297
    goto _OVER;
×
1298
  }
1299
  mndTransSetGroupParallel(pTrans);
9,340✔
1300
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
9,340✔
1301
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
9,340✔
1302

1303
  pRaw = mndDnodeActionEncode(pDnode);
9,340✔
1304
  if (pRaw == NULL) {
9,340✔
1305
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1306
    if (terrno != 0) code = terrno;
×
1307
    goto _OVER;
×
1308
  }
1309
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
9,340✔
1310
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
9,340✔
1311
  pRaw = NULL;
9,340✔
1312

1313
  pRaw = mndDnodeActionEncode(pDnode);
9,340✔
1314
  if (pRaw == NULL) {
9,340✔
1315
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1316
    if (terrno != 0) code = terrno;
×
1317
    goto _OVER;
×
1318
  }
1319
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
9,340✔
1320
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
9,340✔
1321
  pRaw = NULL;
9,340✔
1322

1323
  if (pSObj != NULL) {
9,340✔
1324
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
555✔
1325
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
555✔
1326
  }
1327

1328
  if (pMObj != NULL) {
9,340✔
1329
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
240✔
1330
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
240✔
1331
  }
1332

1333
  if (pQObj != NULL) {
9,340✔
1334
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
156✔
1335
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
156✔
1336
  }
1337

1338
  if (pBObj != NULL) {
9,340✔
1339
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
756✔
1340
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
756✔
1341
  }
1342

1343
  if (numOfVnodes > 0) {
8,584✔
1344
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
7,084✔
1345
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
7,084✔
1346
  }
1347

1348
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
8,584✔
1349

1350
  code = 0;
8,584✔
1351

1352
_OVER:
9,340✔
1353
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
9,340✔
1354
  mndTransDrop(pTrans);
9,340✔
1355
  sdbFreeRaw(pRaw);
9,340✔
1356
  TAOS_RETURN(code);
9,340✔
1357
}
1358

1359
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1360
  bool       isEmpty = false;
×
1361
  SMnodeObj *pMObj = NULL;
×
1362
  SQnodeObj *pQObj = NULL;
×
1363
  SSnodeObj *pSObj = NULL;
×
1364

1365
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1366
  if (pQObj) goto _OVER;
×
1367

1368
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1369
  if (pSObj) goto _OVER;
×
1370

1371
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1372
  if (pMObj) goto _OVER;
×
1373

1374
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1375
  if (numOfVnodes > 0) goto _OVER;
×
1376

1377
  isEmpty = true;
×
1378
_OVER:
×
1379
  mndReleaseMnode(pMnode, pMObj);
×
1380
  mndReleaseQnode(pMnode, pQObj);
×
1381
  mndReleaseSnode(pMnode, pSObj);
×
1382
  return isEmpty;
×
1383
}
1384

1385
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
9,993✔
1386
  SMnode       *pMnode = pReq->info.node;
9,993✔
1387
  int32_t       code = -1;
9,993✔
1388
  SDnodeObj    *pDnode = NULL;
9,993✔
1389
  SMnodeObj    *pMObj = NULL;
9,993✔
1390
  SQnodeObj    *pQObj = NULL;
9,993✔
1391
  SSnodeObj    *pSObj = NULL;
9,993✔
1392
  SBnodeObj    *pBObj = NULL;
9,993✔
1393
  SDropDnodeReq dropReq = {0};
9,993✔
1394
  int64_t       tss = taosGetTimestampMs();
9,993✔
1395

1396
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
9,993✔
1397

1398
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
9,993✔
1399
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1400
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MNODE), NULL, _OVER);
9,993✔
1401

1402
  bool force = dropReq.force;
9,993✔
1403
  if (dropReq.unsafe) {
9,993✔
1404
    force = true;
×
1405
  }
1406

1407
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
9,993✔
1408
  if (pDnode == NULL) {
9,993✔
1409
    int32_t err = terrno;
×
1410
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1411
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1412
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1413
    if (pDnode == NULL) {
×
1414
      code = err;
×
1415
      goto _OVER;
×
1416
    }
1417
  }
1418

1419
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
9,993✔
1420
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
9,993✔
1421
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
9,993✔
1422
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
9,993✔
1423
  if (pMObj != NULL) {
9,993✔
1424
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
893✔
1425
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
413✔
1426
      goto _OVER;
413✔
1427
    }
1428
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
480✔
1429
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
240✔
1430
      goto _OVER;
240✔
1431
    }
1432
  }
1433

1434
#ifdef USE_MOUNT
1435
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
9,340✔
1436
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1437
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1438
    goto _OVER;
×
1439
  }
1440
#endif
1441

1442
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
9,340✔
1443
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
9,340✔
1444

1445
  if (isonline && force) {
9,340✔
1446
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1447
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1448
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1449
    goto _OVER;
×
1450
  }
1451

1452
  mError("vnode num:%d", numOfVnodes);
9,340✔
1453

1454
  bool    vnodeOffline = false;
9,340✔
1455
  void   *pIter = NULL;
9,340✔
1456
  int32_t vgId = -1;
9,340✔
1457
  while (1) {
21,161✔
1458
    SVgObj *pVgroup = NULL;
30,501✔
1459
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
30,501✔
1460
    if (pIter == NULL) break;
30,501✔
1461

1462
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
63,808✔
1463
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
42,647✔
1464
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
42,647✔
1465
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
14,205✔
1466
          vgId = pVgroup->vgId;
×
1467
          vnodeOffline = true;
×
1468
          break;
×
1469
        }
1470
      }
1471
    }
1472

1473
    sdbRelease(pMnode->pSdb, pVgroup);
21,161✔
1474

1475
    if (vnodeOffline) {
21,161✔
1476
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1477
      break;
×
1478
    }
1479
  }
1480

1481
  if (vnodeOffline && !force) {
9,340✔
1482
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1483
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1484
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1485
    goto _OVER;
×
1486
  }
1487

1488
  if (!isonline && !force) {
9,340✔
1489
    code = TSDB_CODE_DNODE_OFFLINE;
×
1490
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
×
1491
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1492
    goto _OVER;
×
1493
  }
1494

1495
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
9,340✔
1496
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
9,340✔
1497

1498
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
9,340✔
1499
    char obj1[30] = {0};
9,340✔
1500
    (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
9,340✔
1501

1502
    int64_t tse = taosGetTimestampMs();
9,340✔
1503
    double  duration = (double)(tse - tss);
9,340✔
1504
    duration = duration / 1000;
9,340✔
1505
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
9,340✔
1506
  }
1507

1508
_OVER:
9,993✔
1509
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
9,993✔
1510
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
1,409✔
1511
  }
1512

1513
  mndReleaseDnode(pMnode, pDnode);
9,993✔
1514
  mndReleaseMnode(pMnode, pMObj);
9,993✔
1515
  mndReleaseQnode(pMnode, pQObj);
9,993✔
1516
  mndReleaseBnode(pMnode, pBObj);
9,993✔
1517
  mndReleaseSnode(pMnode, pSObj);
9,993✔
1518
  tFreeSDropDnodeReq(&dropReq);
9,993✔
1519
  TAOS_RETURN(code);
9,993✔
1520
}
1521

1522
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
163✔
1523
  int32_t code = 0;
163✔
1524
  SMnode *pMnode = pReq->info.node;
163✔
1525
  SSdb   *pSdb = pMnode->pSdb;
163✔
1526
  void   *pIter = NULL;
163✔
1527
  int8_t  encrypting = 0;
163✔
1528

1529
  const STraceId *trace = &pReq->info.traceId;
163✔
1530

1531
  int32_t klen = strlen(pDcfgReq->value);
163✔
1532
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
163✔
1533
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1534
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1535
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1536
    goto _exit;
×
1537
  }
1538

1539
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
163✔
1540
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1541
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1542
    goto _exit;
×
1543
  }
1544

1545
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
163✔
1546
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1547
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1548
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1549
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1550
    goto _exit;
×
1551
  }
1552

1553
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
163✔
1554
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
163✔
1555
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
163✔
1556

1557
  while (1) {
163✔
1558
    SDnodeObj *pDnode = NULL;
326✔
1559
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
326✔
1560
    if (pIter == NULL) break;
326✔
1561
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
163✔
1562
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1563
             offlineReason[pDnode->offlineReason]);
1564
      sdbRelease(pSdb, pDnode);
×
1565
      continue;
×
1566
    }
1567

1568
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
163✔
1569
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
163✔
1570
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
163✔
1571
      void   *pBuf = rpcMallocCont(bufLen);
163✔
1572

1573
      if (pBuf != NULL) {
163✔
1574
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
163✔
1575
          code = bufLen;
×
1576
          sdbRelease(pSdb, pDnode);
×
1577
          goto _exit;
×
1578
        }
1579
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
163✔
1580
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
163✔
1581
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
163✔
1582
        }
1583
      }
1584
    }
1585

1586
    sdbRelease(pSdb, pDnode);
163✔
1587
  }
1588

1589
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
163✔
1590
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1591
  }
1592

1593
_exit:
163✔
1594
  if (code != 0) {
163✔
1595
    if (terrno == 0) terrno = code;
×
1596
  }
1597
  return code;
163✔
1598
}
1599

1600
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
163✔
1601
  int32_t code = 0;
163✔
1602

1603
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1604
  SMnode       *pMnode = pReq->info.node;
163✔
1605
  SMCfgDnodeReq cfgReq = {0};
163✔
1606
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
163✔
1607

1608
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE)) != 0) {
163✔
1609
    tFreeSMCfgDnodeReq(&cfgReq);
×
1610
    TAOS_RETURN(code);
×
1611
  }
1612
  const STraceId *trace = &pReq->info.traceId;
163✔
1613
  SDCfgDnodeReq   dcfgReq = {0};
163✔
1614
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
163✔
1615
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
163✔
1616
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
163✔
1617
    tFreeSMCfgDnodeReq(&cfgReq);
163✔
1618
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
163✔
1619
  } else {
1620
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1621
    tFreeSMCfgDnodeReq(&cfgReq);
×
1622
    TAOS_RETURN(code);
×
1623
  }
1624

1625
#else
1626
  TAOS_RETURN(code);
1627
#endif
1628
}
1629

1630
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
163✔
1631
  SMnode *pMnode = pRsp->info.node;
163✔
1632
  int16_t nSuccess = 0;
163✔
1633
  int16_t nFailed = 0;
163✔
1634

1635
  if (0 == pRsp->code) {
163✔
1636
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
163✔
1637
  } else {
1638
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1639
  }
1640

1641
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
163✔
1642
  bool    finished = nSuccess + nFailed >= nReq;
163✔
1643

1644
  if (finished) {
163✔
1645
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
163✔
1646
  }
1647

1648
  const STraceId *trace = &pRsp->info.traceId;
163✔
1649
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
163✔
1650
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1651

1652
  return 0;
163✔
1653
}
1654

1655
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,127✔
1656
  SMnode *pMnode = pReq->info.node;
1,127✔
1657
  int32_t totalRows = 0;
1,127✔
1658
  int32_t numOfRows = 0;
1,127✔
1659
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,127✔
1660
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,127✔
1661
  char   *pWrite = NULL;
1,127✔
1662
  int32_t cols = 0;
1,127✔
1663
  int32_t code = 0;
1,127✔
1664
  int32_t lino = 0;
1,127✔
1665

1666
  cfgOpts[totalRows] = "statusIntervalMs";
1,127✔
1667
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
1,127✔
1668
  totalRows++;
1,127✔
1669

1670
  cfgOpts[totalRows] = "timezone";
1,127✔
1671
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,127✔
1672
  totalRows++;
1,127✔
1673

1674
  cfgOpts[totalRows] = "locale";
1,127✔
1675
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,127✔
1676
  totalRows++;
1,127✔
1677

1678
  cfgOpts[totalRows] = "charset";
1,127✔
1679
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,127✔
1680
  totalRows++;
1,127✔
1681

1682
  cfgOpts[totalRows] = "monitor";
1,127✔
1683
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,127✔
1684
  totalRows++;
1,127✔
1685

1686
  cfgOpts[totalRows] = "monitorInterval";
1,127✔
1687
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,127✔
1688
  totalRows++;
1,127✔
1689

1690
  cfgOpts[totalRows] = "slowLogThreshold";
1,127✔
1691
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,127✔
1692
  totalRows++;
1,127✔
1693

1694
  cfgOpts[totalRows] = "slowLogMaxLen";
1,127✔
1695
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,127✔
1696
  totalRows++;
1,127✔
1697

1698
  char scopeStr[64] = {0};
1,127✔
1699
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,127✔
1700
  cfgOpts[totalRows] = "slowLogScope";
1,127✔
1701
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,127✔
1702
  totalRows++;
1,127✔
1703

1704
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,127✔
1705
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,127✔
1706

1707
  for (int32_t i = 0; i < totalRows; i++) {
11,270✔
1708
    cols = 0;
10,143✔
1709

1710
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
10,143✔
1711
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,143✔
1712
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
10,143✔
1713

1714
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
10,143✔
1715
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,143✔
1716
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
10,143✔
1717

1718
    numOfRows++;
10,143✔
1719
  }
1720

1721
_OVER:
1,127✔
1722
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,127✔
1723
  pShow->numOfRows += numOfRows;
1,127✔
1724
  return numOfRows;
1,127✔
1725
}
1726

1727
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1728

1729
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
722,361✔
1730
  SMnode    *pMnode = pReq->info.node;
722,361✔
1731
  SSdb      *pSdb = pMnode->pSdb;
722,361✔
1732
  int32_t    numOfRows = 0;
722,361✔
1733
  int32_t    cols = 0;
722,361✔
1734
  ESdbStatus objStatus = 0;
722,361✔
1735
  SDnodeObj *pDnode = NULL;
722,361✔
1736
  int64_t    curMs = taosGetTimestampMs();
722,361✔
1737
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
722,186✔
1738
  int32_t    code = 0;
722,361✔
1739
  int32_t    lino = 0;
722,361✔
1740

1741
  while (numOfRows < rows) {
2,590,795✔
1742
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
2,590,795✔
1743
    if (pShow->pIter == NULL) break;
2,590,795✔
1744
    bool online = mndIsDnodeOnline(pDnode, curMs);
1,868,434✔
1745

1746
    cols = 0;
1,868,434✔
1747

1748
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,868,434✔
1749
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
1,868,434✔
1750

1751
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
1,868,434✔
1752

1753
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,868,434✔
1754
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,868,434✔
1755

1756
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,868,434✔
1757
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
1,868,434✔
1758
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
1,868,434✔
1759

1760
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,868,434✔
1761
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
1,868,434✔
1762
                        &lino, _OVER);
1763

1764
    const char *status = "ready";
1,868,434✔
1765
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
1,868,434✔
1766
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
1,868,434✔
1767
    if (!online) {
1,868,434✔
1768
      if (objStatus == SDB_STATUS_CREATING)
202,788✔
1769
        status = "creating*";
×
1770
      else if (objStatus == SDB_STATUS_DROPPING)
202,788✔
1771
        status = "dropping*";
×
1772
      else
1773
        status = "offline";
202,788✔
1774
    }
1775

1776
    STR_TO_VARSTR(buf, status);
1,868,434✔
1777
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,868,434✔
1778
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,868,434✔
1779

1780
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,868,434✔
1781
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
1,868,434✔
1782
                        _OVER);
1783

1784
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,868,434✔
1785
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
1,868,434✔
1786
                        _OVER);
1787

1788
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
1,868,434✔
1789
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
1,868,434✔
1790

1791
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,868,434✔
1792
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
1,868,434✔
1793
    taosMemoryFreeClear(b);
1,868,434✔
1794

1795
#ifdef TD_ENTERPRISE
1796
    STR_TO_VARSTR(buf, pDnode->machineId);
1,868,434✔
1797
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,868,434✔
1798
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,868,434✔
1799
#endif
1800

1801
    numOfRows++;
1,868,434✔
1802
    sdbRelease(pSdb, pDnode);
1,868,434✔
1803
  }
1804

1805
_OVER:
722,186✔
1806
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
722,361✔
1807

1808
  pShow->numOfRows += numOfRows;
722,361✔
1809
  return numOfRows;
722,361✔
1810
}
1811

1812
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1813
  SSdb *pSdb = pMnode->pSdb;
×
1814
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1815
}
×
1816

1817
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1818
  int32_t    code = 0;
×
1819
  SDnodeObj *pObj = NULL;
×
1820
  void      *pIter = NULL;
×
1821
  SSdb      *pSdb = pMnode->pSdb;
×
1822
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1823
  if (fqdns == NULL) {
×
1824
    mError("failed to init fqdns array");
×
1825
    return NULL;
×
1826
  }
1827

1828
  while (1) {
×
1829
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1830
    if (pIter == NULL) break;
×
1831

1832
    char *fqdn = taosStrdup(pObj->fqdn);
×
1833
    if (fqdn == NULL) {
×
1834
      sdbRelease(pSdb, pObj);
×
1835
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
1836

1837
      code = terrno;
×
1838
      break;
×
1839
    }
1840

1841
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1842
      mError("failed to fqdn into array, but continue at this time");
×
1843
    }
1844
    sdbRelease(pSdb, pObj);
×
1845
  }
1846

1847
_error:
×
1848
  if (code != 0) {
×
1849
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
1850
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
1851
      taosMemoryFreeClear(pFqdn);
×
1852
    }
1853
    taosArrayDestroy(fqdns);
×
1854
    fqdns = NULL;
×
1855
  }
1856

1857
  return fqdns;
×
1858
}
1859

1860
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq) {
536,698✔
1861
  SMnode     *pMnode = pReq->info.node;
536,698✔
1862
  SKeySyncReq req = {0};
536,698✔
1863
  SKeySyncRsp rsp = {0};
536,698✔
1864
  int32_t     code = TSDB_CODE_SUCCESS;
536,698✔
1865

1866
  code = tDeserializeSKeySyncReq(pReq->pCont, pReq->contLen, &req);
536,698✔
1867
  if (code != 0) {
536,698✔
1868
    mError("failed to deserialize key sync req, since %s", tstrerror(code));
×
1869
    goto _OVER;
×
1870
  }
1871

1872
  mInfo("received key sync req from dnode:%d, keyVersion:%d", req.dnodeId, req.keyVersion);
536,698✔
1873

1874
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1875
  // Load mnode's encryption keys
1876
  char masterKeyFile[PATH_MAX] = {0};
536,698✔
1877
  snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
536,698✔
1878
           TD_DIRSEP);
1879
  char derivedKeyFile[PATH_MAX] = {0};
536,698✔
1880
  snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
536,698✔
1881
           TD_DIRSEP);
1882
  char    svrKey[129] = {0};
536,698✔
1883
  char    dbKey[129] = {0};
536,698✔
1884
  char    cfgKey[129] = {0};
536,698✔
1885
  char    metaKey[129] = {0};
536,698✔
1886
  char    dataKey[129] = {0};
536,698✔
1887
  int32_t algorithm = 0;
536,698✔
1888
  int32_t cfgAlgorithm = 0;
536,698✔
1889
  int32_t metaAlgorithm = 0;
536,698✔
1890
  int32_t fileVersion = 0;
536,698✔
1891
  int32_t keyVersion = 0;
536,698✔
1892
  int64_t createTime = 0;
536,698✔
1893
  int64_t svrKeyUpdateTime = 0;
536,698✔
1894
  int64_t dbKeyUpdateTime = 0;
536,698✔
1895

1896
  if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
536,698✔
1897
    keyVersion = tsEncryptKeyVersion;
×
1898
    tstrncpy(svrKey, tsSvrKey, 128);
×
1899
    tstrncpy(dbKey, tsDbKey, 128);
×
1900
    tstrncpy(cfgKey, tsCfgKey, 128);
×
1901
    tstrncpy(metaKey, tsMetaKey, 128);
×
1902
    tstrncpy(dataKey, tsDataKey, 128);
×
1903
    algorithm = tsEncryptAlgorithmType;
×
1904
    cfgAlgorithm = tsCfgAlgorithm;
×
1905
    metaAlgorithm = tsMetaAlgorithm;
×
1906
    fileVersion = tsEncryptFileVersion;
×
1907
    createTime = tsEncryptKeyCreateTime;
×
1908
    svrKeyUpdateTime = tsSvrKeyUpdateTime;
×
1909
    dbKeyUpdateTime = tsDbKeyUpdateTime;
×
1910
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_LOADED;
×
1911
  } else {
1912
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_DISABLED;
536,698✔
1913
  }
1914

1915
  // Check if dnode needs update
1916
  if (req.keyVersion != keyVersion) {
536,698✔
1917
    mInfo("dnode:%d key version mismatch, mnode:%d, dnode:%d, will send keys", req.dnodeId, keyVersion, req.keyVersion);
×
1918

1919
    rsp.keyVersion = keyVersion;
×
1920
    rsp.needUpdate = 1;
×
1921
    tstrncpy(rsp.svrKey, svrKey, sizeof(rsp.svrKey));
×
1922
    tstrncpy(rsp.dbKey, dbKey, sizeof(rsp.dbKey));
×
1923
    tstrncpy(rsp.cfgKey, cfgKey, sizeof(rsp.cfgKey));
×
1924
    tstrncpy(rsp.metaKey, metaKey, sizeof(rsp.metaKey));
×
1925
    tstrncpy(rsp.dataKey, dataKey, sizeof(rsp.dataKey));
×
1926
    rsp.algorithm = algorithm;
×
1927
    rsp.createTime = createTime;
×
1928
    rsp.svrKeyUpdateTime = svrKeyUpdateTime;
×
1929
    rsp.dbKeyUpdateTime = dbKeyUpdateTime;
×
1930
  } else {
1931
    mInfo("dnode:%d key version matches, version:%d", req.dnodeId, keyVersion);
536,698✔
1932
    rsp.keyVersion = keyVersion;
536,698✔
1933
    rsp.needUpdate = 0;
536,698✔
1934
  }
1935
#else
1936
  // Community edition - no encryption support
1937
  mWarn("enterprise features not enabled, key sync not supported");
1938
  rsp.keyVersion = 0;
1939
  rsp.needUpdate = 0;
1940
#endif
1941

1942
  int32_t contLen = tSerializeSKeySyncRsp(NULL, 0, &rsp);
536,698✔
1943
  if (contLen < 0) {
536,698✔
1944
    code = contLen;
×
1945
    goto _OVER;
×
1946
  }
1947

1948
  void *pHead = rpcMallocCont(contLen);
536,698✔
1949
  if (pHead == NULL) {
536,698✔
1950
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1951
    goto _OVER;
×
1952
  }
1953

1954
  contLen = tSerializeSKeySyncRsp(pHead, contLen, &rsp);
536,698✔
1955
  if (contLen < 0) {
536,698✔
1956
    rpcFreeCont(pHead);
×
1957
    code = contLen;
×
1958
    goto _OVER;
×
1959
  }
1960

1961
  pReq->info.rspLen = contLen;
536,698✔
1962
  pReq->info.rsp = pHead;
536,698✔
1963

1964
_OVER:
536,698✔
1965
  if (code != 0) {
536,698✔
1966
    mError("failed to process key sync req, since %s", tstrerror(code));
×
1967
  }
1968
  return code;
536,698✔
1969
}
1970

1971
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq) { return 0; }
×
1972

1973
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
1974
  if (p == NULL) return NULL;
×
1975

1976
  switch (type) {
×
1977
    case SDB_DNODE:
×
1978
      return (SDnodeObj *)p;
×
1979
    case SDB_QNODE:
×
1980
      return ((SQnodeObj *)p)->pDnode;
×
1981
    case SDB_SNODE:
×
1982
      return ((SSnodeObj *)p)->pDnode;
×
1983
    case SDB_BNODE:
×
1984
      return ((SBnodeObj *)p)->pDnode;
×
1985
    default:
×
1986
      break;
×
1987
  }
1988
  return NULL;
×
1989
}
1990
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
1991
  int32_t lino = 0;
×
1992
  SSdb   *pSdb = pMnode->pSdb;
×
1993
  void   *pIter = NULL;
×
1994
  int32_t code = 0;
×
1995

1996
  while (1) {
×
1997
    void *pObj = NULL;
×
1998
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
1999
    if (pIter == NULL) break;
×
2000

2001
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
2002
    if (pDnodeObj == NULL) {
×
2003
      mError("null dnode object for type:%d", type);
×
2004
      sdbRelease(pSdb, pObj);
×
2005
      continue;
×
2006
    }
2007

2008
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
2009
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
2010
      mError("failed to push addr into array");
×
2011
      sdbRelease(pSdb, pObj);
×
2012
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2013
    }
2014
    sdbRelease(pSdb, pObj);
×
2015
  }
2016

2017
_exit:
×
2018
  return code;
×
2019
}
2020

2021
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
2022
  int32_t lino = 0;
×
2023
  int32_t code = 0;
×
2024
  if (pMnode == NULL || pAddr == NULL) {
×
2025
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
2026
  }
2027

2028
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
2029
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2030

2031
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
2032
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2033

2034
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
2035
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2036

2037
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
2038
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2039

2040
_error:
×
2041
  return code;
×
2042
}
2043

2044
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
2045
  int32_t code = 0;
×
2046

2047
  SMnode *pMnode = pReq->info.node;
×
2048
  void   *pIter = NULL;
×
2049
  SSdb   *pSdb = pMnode->pSdb;
×
2050
  mInfo("start to reload dnode tls config");
×
2051

2052
  SMCfgDnodeReq req = {0};
×
2053
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
2054
    goto _OVER;
×
2055
  }
2056

2057
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
2058
    goto _OVER;
×
2059
  }
2060

2061
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
2062
  if (pAddr == NULL) {
×
2063
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
2064
  }
2065

2066
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
2067

2068
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
2069
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
2070
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
2071
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
2072
    if (code != 0) {
×
2073
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
2074
    }
2075
  }
2076

2077
_OVER:
×
2078
  tFreeSMCfgDnodeReq(&req);
×
2079
  taosArrayDestroy(pAddr);
×
2080
  return code;
×
2081
}
2082

2083
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
2084
  int32_t code = 0;
×
2085
  if (pRsp->code != 0) {
×
2086
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
2087
  } else {
2088
    mInfo("succeed to reload dnode tls config");
×
2089
  }
2090
  return code;
×
2091
}
2092

2093
static int32_t mndProcessAlterEncryptKeyReqImpl(SRpcMsg *pReq, SMAlterEncryptKeyReq *pAlterReq) {
×
2094
  int32_t code = 0;
×
2095
  SMnode *pMnode = pReq->info.node;
×
2096
  SSdb   *pSdb = pMnode->pSdb;
×
2097
  void   *pIter = NULL;
×
2098

2099
  const STraceId *trace = &pReq->info.traceId;
×
2100

2101
  // Validate key type
2102
  if (pAlterReq->keyType != 0 && pAlterReq->keyType != 1) {
×
2103
    mGError("msg:%p, failed to alter encrypt key since invalid key type:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", pReq,
×
2104
            pAlterReq->keyType);
2105
    return TSDB_CODE_INVALID_PARA;
×
2106
  }
2107

2108
  // Validate new key length
2109
  int32_t klen = strlen(pAlterReq->newKey);
×
2110
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
2111
    mGError("msg:%p, failed to alter encrypt key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
2112
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
2113
    return TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
2114
  }
2115

2116
  // Prepare SMAlterEncryptKeyReq for distribution to dnodes
2117
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
2118
  alterKeyReq.keyType = pAlterReq->keyType;
×
2119
  tstrncpy(alterKeyReq.newKey, pAlterReq->newKey, sizeof(alterKeyReq.newKey));
×
2120
  alterKeyReq.sqlLen = 0;
×
2121
  alterKeyReq.sql = NULL;
×
2122

2123
  // Send request to all online dnodes
2124
  while (1) {
×
2125
    SDnodeObj *pDnode = NULL;
×
2126
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2127
    if (pIter == NULL) break;
×
2128

2129
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
2130
      mGWarn("msg:%p, don't send alter encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2131
             offlineReason[pDnode->offlineReason]);
2132
      sdbRelease(pSdb, pDnode);
×
2133
      continue;
×
2134
    }
2135

2136
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2137
    int32_t bufLen = tSerializeSMAlterEncryptKeyReq(NULL, 0, &alterKeyReq);
×
2138
    void   *pBuf = rpcMallocCont(bufLen);
×
2139

2140
    if (pBuf != NULL) {
×
2141
      if ((bufLen = tSerializeSMAlterEncryptKeyReq(pBuf, bufLen, &alterKeyReq)) <= 0) {
×
2142
        code = bufLen;
×
2143
        sdbRelease(pSdb, pDnode);
×
2144
        goto _exit;
×
2145
      }
2146
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
2147
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2148
      if (ret != 0) {
×
2149
        mGError("msg:%p, failed to send alter encrypt_key req to dnode:%d, error:%s", pReq, pDnode->id, tstrerror(ret));
×
2150
      } else {
2151
        mGInfo("msg:%p, send alter encrypt_key req to dnode:%d, keyType:%d", pReq, pDnode->id, pAlterReq->keyType);
×
2152
      }
2153
    }
2154

2155
    sdbRelease(pSdb, pDnode);
×
2156
  }
2157

2158
  // Note: mnode runs on dnode, so the local keys will be updated by dnode itself
2159
  // when it receives the alter encrypt key request from mnode
2160
  mGInfo("msg:%p, successfully sent alter encrypt key request to all dnodes, keyType:%d", pReq, pAlterReq->keyType);
×
2161

2162
_exit:
×
2163
  if (code != 0) {
×
2164
    if (terrno == 0) terrno = code;
×
2165
  }
2166
  return code;
×
2167
}
2168

2169
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq) {
×
2170
  SMnode              *pMnode = pReq->info.node;
×
2171
  SMAlterEncryptKeyReq alterReq = {0};
×
2172
  int32_t              code = TSDB_CODE_SUCCESS;
×
2173
  int32_t              lino = 0;
×
2174

2175
  // Check privilege - only admin can alter encryption keys
2176
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2177
                  &lino, _OVER);
2178

2179
  // Deserialize request
2180
  code = tDeserializeSMAlterEncryptKeyReq(pReq->pCont, pReq->contLen, &alterReq);
×
2181
  if (code != 0) {
×
2182
    mError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
2183
    goto _OVER;
×
2184
  }
2185

2186
  mInfo("received alter encrypt key req, keyType:%d", alterReq.keyType);
×
2187

2188
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2189
  // Process and distribute to all dnodes
2190
  code = mndProcessAlterEncryptKeyReqImpl(pReq, &alterReq);
×
2191
  if (code == 0) {
×
2192
    // Audit log
2193
    auditRecord(pReq, pMnode->clusterId, "alterEncryptKey", "", alterReq.keyType == 0 ? "SVR_KEY" : "DB_KEY",
×
2194
                alterReq.sql, alterReq.sqlLen, 0, 0);
2195
  }
2196
#else
2197
  // Community edition - no encryption support
2198
  mError("encryption key management is only available in enterprise edition");
2199
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2200
#endif
2201

2202
_OVER:
×
2203
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2204
    mError("failed to alter encrypt key, keyType:%d, since %s", alterReq.keyType, tstrerror(code));
×
2205
  }
2206

2207
  tFreeSMAlterEncryptKeyReq(&alterReq);
×
2208
  TAOS_RETURN(code);
×
2209
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc