• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4942

29 Jan 2026 06:55AM UTC coverage: 66.788% (-0.08%) from 66.868%
#4942

push

travis-ci

web-flow
fix: explain no response issue (#34435)

17 of 18 new or added lines in 3 files covered. (94.44%)

672 existing lines in 122 files now uncovered.

204165 of 305690 relevant lines covered (66.79%)

125991233.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.15
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndToken.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "taos_monitor.h"
34
#include "tconfig.h"
35
#include "tjson.h"
36
#include "tmisce.h"
37
#include "tunit.h"
38
#if defined(TD_ENTERPRISE)
39
#include "taoskInt.h"
40
#endif
41

42
#define TSDB_DNODE_VER_NUMBER   2
43
#define TSDB_DNODE_RESERVE_SIZE 40
44

45
static const char *offlineReason[] = {
46
    "",
47
    "status msg timeout",
48
    "status not received",
49
    "version not match",
50
    "dnodeId not match",
51
    "clusterId not match",
52
    "statusInterval not match",
53
    "timezone not match",
54
    "locale not match",
55
    "charset not match",
56
    "ttlChangeOnWrite not match",
57
    "enableWhiteList not match",
58
    "encryptionKey not match",
59
    "monitor not match",
60
    "monitor switch not match",
61
    "monitor interval not match",
62
    "monitor slow log threshold not match",
63
    "monitor slow log sql max len not match",
64
    "monitor slow log scope not match",
65
    "unknown",
66
};
67

68
enum {
69
  DND_ACTIVE_CODE,
70
  DND_CONN_ACTIVE_CODE,
71
};
72

73
enum {
74
  DND_CREATE,
75
  DND_ADD,
76
  DND_DROP,
77
};
78

79
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
80
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
81
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
82
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
83
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
84
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
85
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
86

87
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
89
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
90
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
91
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
92
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
93
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
94
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
95
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
96
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
97
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
98

99
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
100
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
101
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
102
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
103

104
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq);
105
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq);
106
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
107
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
108
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq);
109

110
#ifdef _GRANT
111
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
112
#else
113
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
114
#endif
115

116
int32_t mndInitDnode(SMnode *pMnode) {
397,540✔
117
  SSdbTable table = {
397,540✔
118
      .sdbType = SDB_DNODE,
119
      .keyType = SDB_KEY_INT32,
120
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
121
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
122
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
123
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
124
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
125
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
126
  };
127

128
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
397,540✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
397,540✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
397,540✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
397,540✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
397,540✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
397,540✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
397,540✔
135
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
397,540✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
397,540✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
397,540✔
138
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
397,540✔
139
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
397,540✔
140
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
397,540✔
141
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
397,540✔
142

143
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
397,540✔
144
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
397,540✔
145
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
397,540✔
146
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
397,540✔
147

148
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC, mndProcessKeySyncReq);
397,540✔
149
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC_RSP, mndProcessKeySyncRsp);
397,540✔
150
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_ENCRYPT_KEY, mndProcessAlterEncryptKeyReq);
397,540✔
151

152
  return sdbSetTable(pMnode->pSdb, table);
397,540✔
153
}
154

155
void mndCleanupDnode(SMnode *pMnode) {}
397,480✔
156

157
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
283,494✔
158
  int32_t  code = -1;
283,494✔
159
  SSdbRaw *pRaw = NULL;
283,494✔
160
  STrans  *pTrans = NULL;
283,494✔
161

162
  SDnodeObj dnodeObj = {0};
283,494✔
163
  dnodeObj.id = 1;
283,494✔
164
  dnodeObj.createdTime = taosGetTimestampMs();
283,494✔
165
  dnodeObj.updateTime = dnodeObj.createdTime;
283,494✔
166
  dnodeObj.port = tsServerPort;
283,494✔
167
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
283,494✔
168
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
283,494✔
169
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
283,494✔
170
  char *machineId = NULL;
283,494✔
171
  code = tGetMachineId(&machineId);
283,494✔
172
  if (machineId) {
283,494✔
173
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
283,494✔
174
    taosMemoryFreeClear(machineId);
283,494✔
175
  } else {
176
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
177
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
178
    goto _OVER;
×
179
#endif
180
  }
181

182
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
283,494✔
183
  if (pTrans == NULL) {
283,494✔
184
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
185
    if (terrno != 0) code = terrno;
×
186
    goto _OVER;
×
187
  }
188
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
283,494✔
189

190
  pRaw = mndDnodeActionEncode(&dnodeObj);
283,494✔
191
  if (pRaw == NULL) {
283,494✔
192
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
193
    if (terrno != 0) code = terrno;
×
194
    goto _OVER;
×
195
  }
196
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
283,494✔
197
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
283,494✔
198
  pRaw = NULL;
283,494✔
199

200
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
283,494✔
201
  code = 0;
283,494✔
202

203
_OVER:
283,494✔
204
  mndTransDrop(pTrans);
283,494✔
205
  sdbFreeRaw(pRaw);
283,494✔
206
  return code;
283,494✔
207
}
208

209
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,205,644✔
210
  int32_t code = 0;
2,205,644✔
211
  int32_t lino = 0;
2,205,644✔
212
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,205,644✔
213

214
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,205,644✔
215
  if (pRaw == NULL) goto _OVER;
2,205,644✔
216

217
  int32_t dataPos = 0;
2,205,644✔
218
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,205,644✔
219
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,205,644✔
220
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,205,644✔
221
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,205,644✔
222
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,205,644✔
223
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,205,644✔
224
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,205,644✔
225
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,205,644✔
226
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,205,644✔
227
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,205,644✔
228

229
  terrno = 0;
2,205,644✔
230

231
_OVER:
2,205,644✔
232
  if (terrno != 0) {
2,205,644✔
233
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
234
    sdbFreeRaw(pRaw);
×
235
    return NULL;
×
236
  }
237

238
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,205,644✔
239
  return pRaw;
2,205,644✔
240
}
241

242
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
1,489,229✔
243
  int32_t code = 0;
1,489,229✔
244
  int32_t lino = 0;
1,489,229✔
245
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,489,229✔
246
  SSdbRow   *pRow = NULL;
1,489,229✔
247
  SDnodeObj *pDnode = NULL;
1,489,229✔
248

249
  int8_t sver = 0;
1,489,229✔
250
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,489,229✔
251
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
1,489,229✔
252
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
253
    goto _OVER;
×
254
  }
255

256
  pRow = sdbAllocRow(sizeof(SDnodeObj));
1,489,229✔
257
  if (pRow == NULL) goto _OVER;
1,489,229✔
258

259
  pDnode = sdbGetRowObj(pRow);
1,489,229✔
260
  if (pDnode == NULL) goto _OVER;
1,489,229✔
261

262
  int32_t dataPos = 0;
1,489,229✔
263
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
1,489,229✔
264
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
1,489,229✔
265
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
1,489,229✔
266
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
1,489,229✔
267
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
1,489,229✔
268
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
1,489,229✔
269
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
1,489,229✔
270
  if (sver > 1) {
1,489,229✔
271
    int16_t keyLen = 0;
1,489,229✔
272
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,489,229✔
273
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,489,229✔
274
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,489,229✔
275
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,489,229✔
276
  }
277

278
  terrno = 0;
1,489,229✔
279
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
1,489,229✔
280
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
281
  }
282

283
_OVER:
1,489,229✔
284
  if (terrno != 0) {
1,489,229✔
285
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
286
    taosMemoryFreeClear(pRow);
×
287
    return NULL;
×
288
  }
289

290
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
1,489,229✔
291
  return pRow;
1,489,229✔
292
}
293

294
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
713,512✔
295
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
713,512✔
296
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
713,512✔
297

298
  char ep[TSDB_EP_LEN] = {0};
713,512✔
299
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
713,512✔
300
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
713,512✔
301
  return 0;
713,512✔
302
}
303

304
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
1,489,185✔
305
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
1,489,185✔
306
  return 0;
1,489,185✔
307
}
308

309
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
767,024✔
310
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
767,024✔
311
  pOld->updateTime = pNew->updateTime;
767,024✔
312
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
313
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
767,024✔
314
#endif
315
  return 0;
767,024✔
316
}
317

318
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
154,301,227✔
319
  SSdb      *pSdb = pMnode->pSdb;
154,301,227✔
320
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
154,301,231✔
321
  if (pDnode == NULL) {
154,300,549✔
322
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
236,867✔
323
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
49,473✔
324
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
187,394✔
325
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
326
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
187,394✔
327
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
187,394✔
328
    } else {
329
      terrno = TSDB_CODE_APP_ERROR;
×
330
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
331
    }
332
  }
333

334
  return pDnode;
154,299,965✔
335
}
336

337
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
155,535,753✔
338
  SSdb *pSdb = pMnode->pSdb;
155,535,753✔
339
  sdbRelease(pSdb, pDnode);
155,535,753✔
340
}
155,535,955✔
341

342
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
8,893,648✔
343
  SEpSet epSet = {0};
8,893,648✔
344
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
8,893,648✔
345
  return epSet;
8,893,648✔
346
}
347

348
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
792,385✔
349
  SEpSet     epSet = {0};
792,385✔
350
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
792,385✔
351
  if (!pDnode) return epSet;
792,385✔
352

353
  epSet = mndGetDnodeEpset(pDnode);
792,385✔
354

355
  mndReleaseDnode(pMnode, pDnode);
792,385✔
356
  return epSet;
792,385✔
357
}
358

359
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,312,166✔
360
  SSdb *pSdb = pMnode->pSdb;
1,312,166✔
361

362
  void *pIter = NULL;
1,312,166✔
363
  while (1) {
2,314,679✔
364
    SDnodeObj *pDnode = NULL;
3,626,845✔
365
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
3,626,845✔
366
    if (pIter == NULL) break;
3,626,845✔
367

368
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
2,886,602✔
369
      sdbCancelFetch(pSdb, pIter);
571,923✔
370
      return pDnode;
571,923✔
371
    }
372

373
    sdbRelease(pSdb, pDnode);
2,314,679✔
374
  }
375

376
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
740,243✔
377
  return NULL;
740,243✔
378
}
379

380
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
143,530✔
381
  SSdb *pSdb = pMnode->pSdb;
143,530✔
382

383
  void *pIter = NULL;
143,530✔
384
  while (1) {
153,785✔
385
    SDnodeObj *pDnode = NULL;
297,315✔
386
    ESdbStatus objStatus = 0;
297,315✔
387
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
297,315✔
388
    if (pIter == NULL) break;
297,315✔
389

390
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
297,315✔
391
      sdbCancelFetch(pSdb, pIter);
143,530✔
392
      return pDnode;
143,530✔
393
    }
394

395
    sdbRelease(pSdb, pDnode);
153,785✔
396
  }
397

398
  return NULL;
×
399
}
400

401
int32_t mndGetDnodeSize(SMnode *pMnode) {
74,942,488✔
402
  SSdb *pSdb = pMnode->pSdb;
74,942,488✔
403
  return sdbGetSize(pSdb, SDB_DNODE);
74,944,770✔
404
}
405

406
int32_t mndGetDbSize(SMnode *pMnode) {
×
407
  SSdb *pSdb = pMnode->pSdb;
×
408
  return sdbGetSize(pSdb, SDB_DB);
×
409
}
410

411
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
93,065,587✔
412
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
93,065,587✔
413
  if (interval > (int64_t)tsStatusTimeoutMs) {
93,061,743✔
414
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,107,967✔
415
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
38,664✔
416
    }
417
    return false;
2,107,967✔
418
  }
419
  return true;
90,953,776✔
420
}
421

422
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
1,936,449✔
423
  SSdb *pSdb = pMnode->pSdb;
1,936,449✔
424

425
  int32_t numOfEps = 0;
1,936,449✔
426
  void   *pIter = NULL;
1,936,449✔
427
  while (1) {
6,155,506✔
428
    SDnodeObj *pDnode = NULL;
8,091,955✔
429
    ESdbStatus objStatus = 0;
8,091,955✔
430
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
8,091,955✔
431
    if (pIter == NULL) break;
8,091,955✔
432

433
    SDnodeEp dnodeEp = {0};
6,155,506✔
434
    dnodeEp.id = pDnode->id;
6,155,506✔
435
    dnodeEp.ep.port = pDnode->port;
6,155,506✔
436
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
6,155,506✔
437
    sdbRelease(pSdb, pDnode);
6,155,506✔
438

439
    dnodeEp.isMnode = 0;
6,155,506✔
440
    if (mndIsMnode(pMnode, pDnode->id)) {
6,155,506✔
441
      dnodeEp.isMnode = 1;
2,389,608✔
442
    }
443
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
6,155,506✔
444
      mError("failed to put ep into array, but continue at this call");
×
445
    }
446
  }
447
}
1,936,449✔
448

449
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
23,608,077✔
450
  SSdb   *pSdb = pMnode->pSdb;
23,608,077✔
451
  int32_t code = 0;
23,608,077✔
452

453
  int32_t numOfEps = 0;
23,608,077✔
454
  void   *pIter = NULL;
23,608,077✔
455
  while (1) {
98,788,569✔
456
    SDnodeObj *pDnode = NULL;
122,396,646✔
457
    ESdbStatus objStatus = 0;
122,396,646✔
458
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
122,396,646✔
459
    if (pIter == NULL) break;
122,396,646✔
460

461
    SDnodeInfo dInfo;
98,788,224✔
462
    dInfo.id = pDnode->id;
98,788,569✔
463
    dInfo.ep.port = pDnode->port;
98,788,569✔
464
    dInfo.offlineReason = pDnode->offlineReason;
98,788,569✔
465
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
98,788,569✔
466
    sdbRelease(pSdb, pDnode);
98,788,569✔
467
    if (mndIsMnode(pMnode, pDnode->id)) {
98,788,569✔
468
      dInfo.isMnode = 1;
27,519,366✔
469
    } else {
470
      dInfo.isMnode = 0;
71,269,203✔
471
    }
472

473
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
98,788,569✔
474
      code = terrno;
×
475
      sdbCancelFetch(pSdb, pIter);
×
476
      break;
×
477
    }
478
  }
479
  TAOS_RETURN(code);
23,608,077✔
480
}
481

482
#define CHECK_MONITOR_PARA(para, err)                                                                    \
483
  if (pCfg->monitorParas.para != para) {                                                                 \
484
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
485
    terrno = err;                                                                                        \
486
    return err;                                                                                          \
487
  }
488

489
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
×
490
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
×
491
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
×
492
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
×
493
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
×
494
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
×
495

496
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
×
497
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
498
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
499
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
500
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
501
  }
502

503
  /*
504
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
505
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
506
           tsStatusIntervalMs);
507
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
508
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
509
  }
510
  */
511

512
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
×
513
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
514
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
515
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
516
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
517
  }
518

519
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
×
520
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
521
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
522
    return DND_REASON_LOCALE_NOT_MATCH;
×
523
  }
524

525
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
×
526
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
527
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
528
    return DND_REASON_CHARSET_NOT_MATCH;
×
529
  }
530

531
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
×
532
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
533
           tsTtlChangeOnWrite);
534
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
535
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
536
  }
537
  int8_t enable = tsEnableWhiteList ? 1 : 0;
×
538
  if (pCfg->enableWhiteList != enable) {
×
539
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
540
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
541
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
542
  }
543

544
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
×
545
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
×
546
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
547
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
548
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
549
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
550
  }
551

552
  return DND_REASON_ONLINE;
×
553
}
554

555
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
96,133✔
556
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
96,133✔
557
    return 0.0;
1,067✔
558
  }
559

560
  int64_t deltaCount = currentCount - lastCount;
95,066✔
561
  int64_t deltaMs = currentTimeMs - lastTimeMs;
95,066✔
562
  double  rate = (double)deltaCount / (double)deltaMs;
95,066✔
563
  return rate;
95,066✔
564
}
565

566
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
117,371,859✔
567
  bool stateChanged = false;
117,371,859✔
568
  bool roleChanged = pGid->syncState != pVload->syncState ||
117,380,251✔
569
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
230,265,692✔
570
                     pGid->roleTimeMs != pVload->roleTimeMs;
112,893,833✔
571

572
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
117,371,859✔
573
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
187,469✔
574
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
75,821✔
575
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
111,648✔
576
      int64_t currentTimeMs = taosGetTimestampMs();
96,133✔
577
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
96,133✔
578
                                          pGid->lastSyncAppliedIndexUpdateTime);
579

580
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
96,133✔
581
    }
582
  }
583

584
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
117,371,859✔
585
  pGid->syncCommitIndex = pVload->syncCommitIndex;
117,371,859✔
586
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
117,371,859✔
587
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
117,371,859✔
588
  pGid->learnerProgress = pVload->learnerProgress;
117,371,859✔
589
  pGid->snapSeq = pVload->snapSeq;
117,371,859✔
590
  pGid->syncTotalIndex = pVload->syncTotalIndex;
117,371,859✔
591
  if (pVload->snapSeq > 0 && pVload->snapSeq < SYNC_SNAPSHOT_SEQ_END || pVload->syncState == TAOS_SYNC_STATE_LEARNER) {
117,371,859✔
592
    mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64
607,706✔
593
          " , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d",
594
          vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex,
595
          pVload->syncTotalIndex, pVload->learnerProgress, pVload->snapSeq);
596
  }
597

598
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
117,371,859✔
599
      pGid->startTimeMs != pVload->startTimeMs) {
112,419,154✔
600
    mInfo(
4,952,705✔
601
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
602
        "canRead:%d, dnode:%d",
603
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
604
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
605
    pGid->syncState = pVload->syncState;
4,952,705✔
606
    pGid->syncTerm = pVload->syncTerm;
4,952,705✔
607
    pGid->syncRestore = pVload->syncRestore;
4,952,705✔
608
    pGid->syncCanRead = pVload->syncCanRead;
4,952,705✔
609
    pGid->startTimeMs = pVload->startTimeMs;
4,952,705✔
610
    pGid->roleTimeMs = pVload->roleTimeMs;
4,952,705✔
611
    stateChanged = true;
4,952,705✔
612
  }
613
  return stateChanged;
117,371,859✔
614
}
615

616
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
26,384,022✔
617
  bool stateChanged = false;
26,384,022✔
618
  bool roleChanged = pObj->syncState != pMload->syncState ||
26,389,165✔
619
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
52,356,588✔
620
                     pObj->roleTimeMs != pMload->roleTimeMs;
25,972,566✔
621
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
26,384,022✔
622
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
420,678✔
623
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
624
          pObj->syncTerm, pMload->syncTerm);
625
    pObj->syncState = pMload->syncState;
420,678✔
626
    pObj->syncTerm = pMload->syncTerm;
420,678✔
627
    pObj->syncRestore = pMload->syncRestore;
420,678✔
628
    pObj->roleTimeMs = pMload->roleTimeMs;
420,678✔
629
    stateChanged = true;
420,678✔
630
  }
631
  return stateChanged;
26,384,022✔
632
}
633

634
extern char   *tsMonFwUri;
635
extern char   *tsMonSlowLogUri;
636
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
70✔
637
  SMnode    *pMnode = pReq->info.node;
70✔
638
  SStatisReq statisReq = {0};
70✔
639
  int32_t    code = -1;
70✔
640

641
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
70✔
642

643
  if (tsMonitorLogProtocol) {
70✔
644
    mInfo("process statis req,\n %s", statisReq.pCont);
70✔
645
  }
646

647
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
70✔
648
    monSendContent(statisReq.pCont, tsMonFwUri);
70✔
649
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
650
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
651
  }
652

653
  tFreeSStatisReq(&statisReq);
70✔
654
  return 0;
70✔
655
}
656

657
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
340✔
658
  mTrace("process audit req:%p", pReq);
340✔
659
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
340✔
660
    SMnode   *pMnode = pReq->info.node;
340✔
661
    SAuditReq auditReq = {0};
340✔
662

663
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
340✔
664

665
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
340✔
666

667
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
340✔
668
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
669

670
    tFreeSAuditReq(&auditReq);
340✔
671
  }
672
  return 0;
340✔
673
}
674

675
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
676
  mTrace("process audit req:%p", pReq);
×
677
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
678
    SMnode        *pMnode = pReq->info.node;
×
679
    SBatchAuditReq auditReq = {0};
×
680

681
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
682

683
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
684

685
    for (int32_t i = 0; i < nAudit; ++i) {
×
686
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
687
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
688

689
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
690
                     audit->duration, audit->affectedRows);
691
    }
692

693
    tFreeSBatchAuditReq(&auditReq);
×
694
  }
695
  return 0;
×
696
}
697

698
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
632,317✔
699
  int32_t       code = 0, lino = 0;
632,317✔
700
  SDnodeInfoReq infoReq = {0};
632,317✔
701
  int32_t       contLen = 0;
632,317✔
702
  void         *pReq = NULL;
632,317✔
703

704
  infoReq.dnodeId = pDnode->id;
632,317✔
705
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
632,317✔
706

707
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
632,317✔
708
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
709
  }
710
  pReq = rpcMallocCont(contLen);
632,317✔
711
  if (pReq == NULL) {
632,317✔
712
    TAOS_RETURN(terrno);
×
713
  }
714

715
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
632,317✔
716
    code = contLen;
×
717
    goto _exit;
×
718
  }
719

720
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
632,317✔
721
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
632,317✔
722
_exit:
632,317✔
723
  if (code < 0) {
632,317✔
724
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
725
  }
726
  TAOS_RETURN(code);
632,317✔
727
}
728

729
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
632,229✔
730
  int32_t       code = 0, lino = 0;
632,229✔
731
  SMnode       *pMnode = pReq->info.node;
632,229✔
732
  SDnodeInfoReq infoReq = {0};
632,229✔
733
  SDnodeObj    *pDnode = NULL;
632,229✔
734
  STrans       *pTrans = NULL;
632,229✔
735
  SSdbRaw      *pCommitRaw = NULL;
632,229✔
736

737
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
632,229✔
738

739
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
632,229✔
740
  if (pDnode == NULL) {
632,229✔
741
    TAOS_CHECK_EXIT(terrno);
247✔
742
  }
743

744
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
631,982✔
745
  if (pTrans == NULL) {
631,982✔
746
    TAOS_CHECK_EXIT(terrno);
×
747
  }
748

749
  pDnode->updateTime = taosGetTimestampMs();
631,982✔
750

751
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
631,982✔
752
    TAOS_CHECK_EXIT(terrno);
×
753
  }
754
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
631,982✔
755
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
756
    TAOS_CHECK_EXIT(code);
×
757
  }
758
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
631,982✔
759
  pCommitRaw = NULL;
631,982✔
760

761
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
631,982✔
762
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
99✔
763
    TAOS_CHECK_EXIT(code);
99✔
764
  }
765

766
_exit:
632,229✔
767
  mndReleaseDnode(pMnode, pDnode);
632,229✔
768
  if (code != 0) {
632,229✔
769
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
346✔
770
  }
771
  mndTransDrop(pTrans);
632,229✔
772
  sdbFreeRaw(pCommitRaw);
632,229✔
773
  TAOS_RETURN(code);
632,229✔
774
}
775

776
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
42,205,196✔
777
  SMnode    *pMnode = pReq->info.node;
42,205,196✔
778
  SStatusReq statusReq = {0};
42,205,196✔
779
  SDnodeObj *pDnode = NULL;
42,205,196✔
780
  int32_t    code = -1;
42,205,196✔
781

782
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
42,205,196✔
783

784
  int64_t clusterid = mndGetClusterId(pMnode);
42,205,196✔
785
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
42,205,196✔
786
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
787
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
788
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
789
    goto _OVER;
×
790
  }
791

792
  if (statusReq.dnodeId == 0) {
42,205,196✔
793
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
981,108✔
794
    if (pDnode == NULL) {
981,108✔
795
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
409,419✔
796
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
409,419✔
797
      if (terrno != 0) code = terrno;
409,419✔
798
      goto _OVER;
409,419✔
799
    }
800
  } else {
801
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
41,224,088✔
802
    if (pDnode == NULL) {
41,224,088✔
803
      int32_t err = terrno;
184,131✔
804
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
184,131✔
805
      if (pDnode != NULL) {
184,131✔
806
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
234✔
807
        terrno = err;
234✔
808
        goto _OVER;
234✔
809
      }
810

811
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
183,897✔
812
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
183,897✔
813
        terrno = err;
40,367✔
814
        goto _OVER;
40,367✔
815
      } else {
816
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
143,530✔
817
        if (pDnode == NULL) goto _OVER;
143,530✔
818
      }
819
    }
820
  }
821

822
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
41,755,176✔
823
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
41,755,176✔
824

825
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
41,755,176✔
826
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
41,755,176✔
827
  int64_t curMs = taosGetTimestampMs();
41,755,176✔
828
  bool    online = mndIsDnodeOnline(pDnode, curMs);
41,755,176✔
829
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
41,755,176✔
830
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
41,755,176✔
831
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
41,755,176✔
832
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
41,755,176✔
833
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
41,755,176✔
834
  bool    analVerChanged = (analVer != statusReq.analVer);
41,755,176✔
835
  bool    auditDBChanged = false;
41,755,176✔
836
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
41,755,176✔
837
  bool    auditTokenChanged = false;
41,755,176✔
838
  char    auditToken[TSDB_TOKEN_LEN] = {0};
41,755,176✔
839

840
  if (tsAuditUseToken) {
41,755,176✔
841
    SDbObj *pDb = mndAcquireAuditDb(pMnode);
41,755,176✔
842
    if (pDb != NULL) {
41,755,176✔
843
      SName name = {0};
1,088✔
844
      if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0)
1,088✔
845
        mError("db:%s, failed to parse db name", pDb->name);
×
846
      tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN);
1,088✔
847
      mndReleaseDb(pMnode, pDb);
1,088✔
848
    }
849
    if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
41,755,176✔
850

851
    char    auditUser[TSDB_USER_LEN] = {0};
41,755,176✔
852
    int32_t ret = 0;
41,755,176✔
853
    if ((ret = mndGetAuditUser(pMnode, auditUser)) != 0) {
41,755,176✔
854
      mTrace("dnode:%d, failed to get audit user since %s", pDnode->id, tstrerror(ret));
×
855
    } else {
856
      mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser);
41,755,176✔
857
      int32_t ret = 0;
41,755,176✔
858
      if ((ret = mndGetUserActiveToken("audit", auditToken)) != 0) {
41,755,176✔
859
        mTrace("dnode:%d, failed to get audit user active token, token:%s, since %s", pDnode->id, auditToken,
41,754,156✔
860
               tstrerror(ret));
861
      } else {
862
        mTrace("dnode:%d, get audit user active token:%s", pDnode->id, auditToken);
1,020✔
863
        if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditTokenChanged = true;
1,020✔
864
      }
865
    }
866
  } 
867

868
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
41,153,696✔
869
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
39,819,084✔
870
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditTokenChanged;
82,908,872✔
871
  const STraceId *trace = &pReq->info.traceId;
41,755,176✔
872
  char            timestamp[TD_TIME_STR_LEN] = {0};
41,755,176✔
873
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
41,755,176✔
874
  mGTrace(
41,755,176✔
875
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
876
      "timestamp:%s",
877
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
878

879
  if (reboot) {
41,755,176✔
880
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
628,212✔
881
  }
882

883
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
41,755,176✔
884
  if (labs(delta) >= tsTimestampDeltaLimit) {
41,755,176✔
885
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
886
    code = terrno;
×
887

888
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
889
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
890
           tstrerror(code), tsTimestampDeltaLimit);
891
    goto _OVER;
×
892
  }
893
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
159,737,383✔
894
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
117,982,207✔
895

896
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
117,982,207✔
897
    if (pVgroup != NULL) {
117,982,207✔
898
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
117,423,668✔
899
        pVgroup->cacheUsage = pVload->cacheUsage;
92,324,142✔
900
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
92,324,142✔
901
        pVgroup->numOfTables = pVload->numOfTables;
92,324,142✔
902
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
92,324,142✔
903
        pVgroup->totalStorage = pVload->totalStorage;
92,324,142✔
904
        pVgroup->compStorage = pVload->compStorage;
92,324,142✔
905
        pVgroup->pointsWritten = pVload->pointsWritten;
92,324,142✔
906
      }
907
      bool stateChanged = false;
117,423,668✔
908
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
152,368,200✔
909
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
152,316,391✔
910
        if (pGid->dnodeId == statusReq.dnodeId) {
152,316,391✔
911
          if (pVload->startTimeMs == 0) {
117,371,859✔
912
            pVload->startTimeMs = statusReq.rebootTime;
×
913
          }
914
          if (pVload->roleTimeMs == 0) {
117,371,859✔
915
            pVload->roleTimeMs = statusReq.rebootTime;
×
916
          }
917
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
117,371,859✔
918
          break;
117,371,859✔
919
        }
920
      }
921
      if (stateChanged) {
117,423,668✔
922
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,952,705✔
923
        if (pDb != NULL && pDb->stateTs != curMs) {
4,952,705✔
924
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,445,915✔
925
                pDb->stateTs, curMs);
926
          pDb->stateTs = curMs;
3,445,915✔
927
        }
928
        mndReleaseDb(pMnode, pDb);
4,952,705✔
929
      }
930
    }
931

932
    mndReleaseVgroup(pMnode, pVgroup);
117,982,207✔
933
  }
934

935
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
41,755,176✔
936
  if (pObj != NULL) {
41,755,176✔
937
    if (statusReq.mload.roleTimeMs == 0) {
26,384,022✔
938
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
214,873✔
939
    }
940
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
26,384,022✔
941
    mndReleaseMnode(pMnode, pObj);
26,384,022✔
942
  }
943

944
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
41,755,176✔
945
  if (pQnode != NULL) {
41,755,176✔
946
    pQnode->load = statusReq.qload;
234,216✔
947
    mndReleaseQnode(pMnode, pQnode);
234,216✔
948
  }
949

950
  if (needCheck) {
41,755,176✔
951
    if (statusReq.sver != tsVersion) {
1,936,449✔
952
      if (pDnode != NULL) {
×
953
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
954
      }
955
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
956
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
957
      goto _OVER;
×
958
    }
959

960
    if (statusReq.dnodeId == 0) {
1,936,449✔
961
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
571,689✔
962
    } else {
963
      if (statusReq.clusterId != pMnode->clusterId) {
1,364,760✔
964
        if (pDnode != NULL) {
×
965
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
966
        }
967
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
968
               pMnode->clusterId);
969
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
970
        goto _OVER;
×
971
      }
972
    }
973

974
    // Verify whether the cluster parameters are consistent when status change from offline to ready
975
    // pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
976
    // if (pDnode->offlineReason != 0) {
977
    //   mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
978
    //   if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
979
    //   goto _OVER;
980
    // }
981

982
    if (!online) {
1,936,449✔
983
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
601,480✔
984
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
985
    } else {
986
      mInfo("dnode:%d, do check in status req, online:%d dnodeVer:%" PRId64 ":%" PRId64
1,334,969✔
987
            " reboot:%d, dnodeChanged:%d supportVnodesChanged:%d analVerChanged:%d encryptKeyChanged:%d "
988
            "enableWhiteListChanged:%d auditDBChanged:%d auditTokenChanged:%d pMnode->ipWhiteVer:%" PRId64
989
            " statusReq.ipWhiteVer:%" PRId64 " pMnode->timeWhiteVer:%" PRId64 " statusReq.timeWhiteVer:%" PRId64,
990
            pDnode->id, online, statusReq.dnodeVer, dnodeVer, reboot, dnodeChanged, supportVnodesChanged,
991
            analVerChanged, encryptKeyChanged, enableWhiteListChanged, auditDBChanged, auditTokenChanged,
992
            pMnode->ipWhiteVer, statusReq.ipWhiteVer, pMnode->timeWhiteVer, statusReq.timeWhiteVer);
993
    }
994

995
    pDnode->rebootTime = statusReq.rebootTime;
1,936,449✔
996
    pDnode->numOfCores = statusReq.numOfCores;
1,936,449✔
997
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
1,936,449✔
998
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
1,936,449✔
999
    pDnode->memAvail = statusReq.memAvail;
1,936,449✔
1000
    pDnode->memTotal = statusReq.memTotal;
1,936,449✔
1001
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
1,936,449✔
1002
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
1,936,449✔
1003
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
1,936,449✔
1004
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
632,317✔
1005
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
632,317✔
1006
        goto _OVER;
×
1007
      }
1008
    }
1009

1010
    SStatusRsp statusRsp = {0};
1,936,449✔
1011
    statusRsp.statusSeq++;
1,936,449✔
1012
    statusRsp.analVer = analVer;
1,936,449✔
1013
    statusRsp.dnodeVer = dnodeVer;
1,936,449✔
1014
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
1,936,449✔
1015
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
1,936,449✔
1016
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
1,936,449✔
1017
    if (statusRsp.pDnodeEps == NULL) {
1,936,449✔
1018
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1019
      goto _OVER;
×
1020
    }
1021

1022
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
1,936,449✔
1023
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
1,936,449✔
1024
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
1,936,449✔
1025

1026
    if (auditDB[0] != '\0') {
1,936,449✔
1027
      mInfo("dnode:%d, set audit db %s in process status rsp", statusReq.dnodeId, auditDB);
136✔
1028
      tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
136✔
1029
    }
1030
    if (auditToken[0] != '\0') {
1,936,449✔
1031
      mInfo("dnode:%d, set audit token %s in process status rsp", statusReq.dnodeId, auditToken);
68✔
1032
      tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN);
68✔
1033
    }
1034

1035
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
1,936,449✔
1036
    void   *pHead = rpcMallocCont(contLen);
1,936,449✔
1037
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
1,936,449✔
1038
    taosArrayDestroy(statusRsp.pDnodeEps);
1,936,449✔
1039
    if (contLen < 0) {
1,936,449✔
1040
      code = contLen;
×
1041
      goto _OVER;
×
1042
    }
1043

1044
    pReq->info.rspLen = contLen;
1,936,449✔
1045
    pReq->info.rsp = pHead;
1,936,449✔
1046
  }
1047

1048
  pDnode->accessTimes++;
41,755,176✔
1049
  pDnode->lastAccessTime = curMs;
41,755,176✔
1050
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
41,755,176✔
1051
    pDnode->offlineReason = DND_REASON_ONLINE;
601,480✔
1052
  }
1053
  code = 0;
41,755,176✔
1054

1055
_OVER:
42,205,196✔
1056
  mndReleaseDnode(pMnode, pDnode);
42,205,196✔
1057
  taosArrayDestroy(statusReq.pVloads);
42,205,196✔
1058
  if (code != 0) {
42,205,196✔
1059
    mError("dnode:%d, failed to process status req since %s", statusReq.dnodeId, tstrerror(code));
409,419✔
1060
    return code;
409,419✔
1061
  }
1062

1063
  return mndUpdClusterInfo(pReq);
41,795,777✔
1064
}
1065

1066
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
1067
  SMnode    *pMnode = pReq->info.node;
×
1068
  SNotifyReq notifyReq = {0};
×
1069
  int32_t    code = 0;
×
1070

1071
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
1072
    terrno = code;
×
1073
    goto _OVER;
×
1074
  }
1075

1076
  int64_t clusterid = mndGetClusterId(pMnode);
×
1077
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
1078
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
1079
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1080
          notifyReq.clusterId, clusterid, tstrerror(code));
1081
    goto _OVER;
×
1082
  }
1083

1084
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
1085
  for (int32_t v = 0; v < nVgroup; ++v) {
×
1086
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1087

1088
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
1089
    if (pVgroup != NULL) {
×
1090
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
1091
      mndReleaseVgroup(pMnode, pVgroup);
×
1092
    }
1093
  }
1094
  code = mndUpdClusterInfo(pReq);
×
1095
_OVER:
×
1096
  tFreeSNotifyReq(&notifyReq);
×
1097
  return code;
×
1098
}
1099

1100
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
146,927✔
1101
  int32_t  code = -1;
146,927✔
1102
  SSdbRaw *pRaw = NULL;
146,927✔
1103
  STrans  *pTrans = NULL;
146,927✔
1104

1105
  SDnodeObj dnodeObj = {0};
146,927✔
1106
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
146,927✔
1107
  dnodeObj.createdTime = taosGetTimestampMs();
146,927✔
1108
  dnodeObj.updateTime = dnodeObj.createdTime;
146,927✔
1109
  dnodeObj.port = pCreate->port;
146,927✔
1110
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
146,927✔
1111
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
146,927✔
1112

1113
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
146,927✔
1114
  if (pTrans == NULL) {
146,927✔
1115
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1116
    if (terrno != 0) code = terrno;
×
1117
    goto _OVER;
×
1118
  }
1119
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
146,927✔
1120
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
146,927✔
1121

1122
  pRaw = mndDnodeActionEncode(&dnodeObj);
146,927✔
1123
  if (pRaw == NULL) {
146,927✔
1124
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1125
    if (terrno != 0) code = terrno;
×
1126
    goto _OVER;
×
1127
  }
1128
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
146,927✔
1129
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
146,927✔
1130
  pRaw = NULL;
146,927✔
1131

1132
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
146,927✔
1133
  code = 0;
146,927✔
1134

1135
_OVER:
146,927✔
1136
  mndTransDrop(pTrans);
146,927✔
1137
  sdbFreeRaw(pRaw);
146,927✔
1138
  return code;
146,927✔
1139
}
1140

1141
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
27,514✔
1142
  SMnode       *pMnode = pReq->info.node;
27,514✔
1143
  SSdb         *pSdb = pMnode->pSdb;
27,514✔
1144
  SDnodeObj    *pObj = NULL;
27,514✔
1145
  void         *pIter = NULL;
27,514✔
1146
  SDnodeListRsp rsp = {0};
27,514✔
1147
  int32_t       code = -1;
27,514✔
1148

1149
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
27,514✔
1150
  if (NULL == rsp.dnodeList) {
27,514✔
1151
    mError("failed to alloc epSet while process dnode list req");
×
1152
    code = terrno;
×
1153
    goto _OVER;
×
1154
  }
1155

1156
  while (1) {
53,826✔
1157
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
81,340✔
1158
    if (pIter == NULL) break;
81,340✔
1159

1160
    SDNodeAddr dnodeAddr = {0};
53,826✔
1161
    dnodeAddr.nodeId = pObj->id;
53,826✔
1162
    dnodeAddr.epSet.numOfEps = 1;
53,826✔
1163
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
53,826✔
1164
    dnodeAddr.epSet.eps[0].port = pObj->port;
53,826✔
1165

1166
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
107,652✔
1167
      if (terrno != 0) code = terrno;
×
1168
      sdbRelease(pSdb, pObj);
×
1169
      sdbCancelFetch(pSdb, pIter);
×
1170
      goto _OVER;
×
1171
    }
1172

1173
    sdbRelease(pSdb, pObj);
53,826✔
1174
  }
1175

1176
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
27,514✔
1177
  void   *pRsp = rpcMallocCont(rspLen);
27,514✔
1178
  if (pRsp == NULL) {
27,514✔
1179
    code = terrno;
×
1180
    goto _OVER;
×
1181
  }
1182

1183
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
27,514✔
1184
    code = rspLen;
×
1185
    goto _OVER;
×
1186
  }
1187

1188
  pReq->info.rspLen = rspLen;
27,514✔
1189
  pReq->info.rsp = pRsp;
27,514✔
1190
  code = 0;
27,514✔
1191

1192
_OVER:
27,514✔
1193

1194
  if (code != 0) {
27,514✔
1195
    mError("failed to get dnode list since %s", tstrerror(code));
×
1196
  }
1197

1198
  tFreeSDnodeListRsp(&rsp);
27,514✔
1199

1200
  TAOS_RETURN(code);
27,514✔
1201
}
1202

1203
void getSlowLogScopeString(int32_t scope, char *result) {
1,148✔
1204
  if (scope == SLOW_LOG_TYPE_NULL) {
1,148✔
1205
    (void)strncat(result, "NONE", 64);
×
1206
    return;
×
1207
  }
1208
  while (scope > 0) {
2,296✔
1209
    if (scope & SLOW_LOG_TYPE_QUERY) {
1,148✔
1210
      (void)strncat(result, "QUERY", 64);
1,148✔
1211
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,148✔
1212
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1213
      (void)strncat(result, "INSERT", 64);
×
1214
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1215
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1216
      (void)strncat(result, "OTHERS", 64);
×
1217
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1218
    } else {
1219
      (void)printf("invalid slow log scope:%d", scope);
×
1220
      return;
×
1221
    }
1222

1223
    if (scope > 0) {
1,148✔
1224
      (void)strncat(result, "|", 64);
×
1225
    }
1226
  }
1227
}
1228

1229
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
146,927✔
1230
  SMnode         *pMnode = pReq->info.node;
146,927✔
1231
  int32_t         code = -1;
146,927✔
1232
  SDnodeObj      *pDnode = NULL;
146,927✔
1233
  SCreateDnodeReq createReq = {0};
146,927✔
1234
  int32_t         lino = 0;
146,927✔
1235
  int64_t         tss = taosGetTimestampMs();
146,927✔
1236

1237
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
146,927✔
1238
    goto _OVER;
×
1239
  }
1240

1241
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
146,927✔
1242
  TAOS_CHECK_GOTO(code, &lino, _OVER);
146,927✔
1243

1244
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
146,927✔
1245
  code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_DNODE);
146,927✔
1246
  TAOS_CHECK_GOTO(code, &lino, _OVER);
146,927✔
1247

1248
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
146,927✔
1249
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1250
    goto _OVER;
×
1251
  }
1252
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1253
  // if (code != 0) {
1254
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1255
  //          tstrerror(code));
1256
  //   goto _OVER;
1257
  // }
1258

1259
  char ep[TSDB_EP_LEN];
146,927✔
1260
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
146,927✔
1261
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
146,927✔
1262
  if (pDnode != NULL) {
146,927✔
1263
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1264
    goto _OVER;
×
1265
  }
1266

1267
  code = mndCreateDnode(pMnode, pReq, &createReq);
146,927✔
1268
  if (code == 0) {
146,927✔
1269
    code = TSDB_CODE_ACTION_IN_PROGRESS;
146,927✔
1270
    tsGrantHBInterval = 5;
146,927✔
1271
  }
1272

1273
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
146,927✔
1274
    char obj[200] = {0};
146,927✔
1275
    (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
146,927✔
1276

1277
    int64_t tse = taosGetTimestampMs();
146,927✔
1278
    double  duration = (double)(tse - tss);
146,927✔
1279
    duration = duration / 1000;
146,927✔
1280
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
146,927✔
1281
  }
1282

1283
_OVER:
146,927✔
1284
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
146,927✔
1285
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1286
  }
1287

1288
  mndReleaseDnode(pMnode, pDnode);
146,927✔
1289
  tFreeSCreateDnodeReq(&createReq);
146,927✔
1290
  TAOS_RETURN(code);
146,927✔
1291
}
1292

1293
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1294

1295
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
1,781✔
1296

1297
#ifndef TD_ENTERPRISE
1298
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1299
#endif
1300

1301
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
9,142✔
1302
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1303
  int32_t  code = -1;
9,142✔
1304
  SSdbRaw *pRaw = NULL;
9,142✔
1305
  STrans  *pTrans = NULL;
9,142✔
1306
  int32_t  lino = 0;
9,142✔
1307

1308
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
9,142✔
1309
  if (pTrans == NULL) {
9,142✔
1310
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1311
    if (terrno != 0) code = terrno;
×
1312
    goto _OVER;
×
1313
  }
1314
  mndTransSetGroupParallel(pTrans);
9,142✔
1315
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
9,142✔
1316
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
9,142✔
1317

1318
  pRaw = mndDnodeActionEncode(pDnode);
9,142✔
1319
  if (pRaw == NULL) {
9,142✔
1320
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1321
    if (terrno != 0) code = terrno;
×
1322
    goto _OVER;
×
1323
  }
1324
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
9,142✔
1325
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
9,142✔
1326
  pRaw = NULL;
9,142✔
1327

1328
  pRaw = mndDnodeActionEncode(pDnode);
9,142✔
1329
  if (pRaw == NULL) {
9,142✔
1330
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1331
    if (terrno != 0) code = terrno;
×
1332
    goto _OVER;
×
1333
  }
1334
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
9,142✔
1335
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
9,142✔
1336
  pRaw = NULL;
9,142✔
1337

1338
  if (pSObj != NULL) {
9,142✔
1339
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
526✔
1340
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
526✔
1341
  }
1342

1343
  if (pMObj != NULL) {
9,142✔
1344
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
224✔
1345
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
224✔
1346
  }
1347

1348
  if (pQObj != NULL) {
9,142✔
1349
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
140✔
1350
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
140✔
1351
  }
1352

1353
  if (pBObj != NULL) {
9,142✔
1354
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
747✔
1355
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
747✔
1356
  }
1357

1358
  if (numOfVnodes > 0) {
8,395✔
1359
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
6,973✔
1360
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
6,973✔
1361
  }
1362

1363
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
8,395✔
1364

1365
  code = 0;
8,395✔
1366

1367
_OVER:
9,142✔
1368
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
9,142✔
1369
  mndTransDrop(pTrans);
9,142✔
1370
  sdbFreeRaw(pRaw);
9,142✔
1371
  TAOS_RETURN(code);
9,142✔
1372
}
1373

1374
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1375
  bool       isEmpty = false;
×
1376
  SMnodeObj *pMObj = NULL;
×
1377
  SQnodeObj *pQObj = NULL;
×
1378
  SSnodeObj *pSObj = NULL;
×
1379

1380
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1381
  if (pQObj) goto _OVER;
×
1382

1383
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1384
  if (pSObj) goto _OVER;
×
1385

1386
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1387
  if (pMObj) goto _OVER;
×
1388

1389
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1390
  if (numOfVnodes > 0) goto _OVER;
×
1391

1392
  isEmpty = true;
×
1393
_OVER:
×
1394
  mndReleaseMnode(pMnode, pMObj);
×
1395
  mndReleaseQnode(pMnode, pQObj);
×
1396
  mndReleaseSnode(pMnode, pSObj);
×
1397
  return isEmpty;
×
1398
}
1399

1400
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
9,762✔
1401
  SMnode       *pMnode = pReq->info.node;
9,762✔
1402
  int32_t       code = -1;
9,762✔
1403
  SDnodeObj    *pDnode = NULL;
9,762✔
1404
  SMnodeObj    *pMObj = NULL;
9,762✔
1405
  SQnodeObj    *pQObj = NULL;
9,762✔
1406
  SSnodeObj    *pSObj = NULL;
9,762✔
1407
  SBnodeObj    *pBObj = NULL;
9,762✔
1408
  SDropDnodeReq dropReq = {0};
9,762✔
1409
  int64_t       tss = taosGetTimestampMs();
9,762✔
1410

1411
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
9,762✔
1412

1413
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
9,762✔
1414
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1415
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MNODE), NULL, _OVER);
9,762✔
1416

1417
  bool force = dropReq.force;
9,762✔
1418
  if (dropReq.unsafe) {
9,762✔
1419
    force = true;
×
1420
  }
1421

1422
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
9,762✔
1423
  if (pDnode == NULL) {
9,762✔
1424
    int32_t err = terrno;
×
1425
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1426
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1427
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1428
    if (pDnode == NULL) {
×
1429
      code = err;
×
1430
      goto _OVER;
×
1431
    }
1432
  }
1433

1434
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
9,762✔
1435
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
9,762✔
1436
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
9,762✔
1437
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
9,762✔
1438
  if (pMObj != NULL) {
9,762✔
1439
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
844✔
1440
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
396✔
1441
      goto _OVER;
396✔
1442
    }
1443
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
448✔
1444
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
224✔
1445
      goto _OVER;
224✔
1446
    }
1447
  }
1448

1449
#ifdef USE_MOUNT
1450
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
9,142✔
1451
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1452
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1453
    goto _OVER;
×
1454
  }
1455
#endif
1456

1457
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
9,142✔
1458
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
9,142✔
1459

1460
  if (isonline && force) {
9,142✔
1461
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1462
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1463
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1464
    goto _OVER;
×
1465
  }
1466

1467
  mError("vnode num:%d", numOfVnodes);
9,142✔
1468

1469
  bool    vnodeOffline = false;
9,142✔
1470
  void   *pIter = NULL;
9,142✔
1471
  int32_t vgId = -1;
9,142✔
1472
  while (1) {
20,832✔
1473
    SVgObj *pVgroup = NULL;
29,974✔
1474
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
29,974✔
1475
    if (pIter == NULL) break;
29,974✔
1476

1477
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
62,920✔
1478
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
42,088✔
1479
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
42,088✔
1480
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
13,958✔
1481
          vgId = pVgroup->vgId;
×
1482
          vnodeOffline = true;
×
1483
          break;
×
1484
        }
1485
      }
1486
    }
1487

1488
    sdbRelease(pMnode->pSdb, pVgroup);
20,832✔
1489

1490
    if (vnodeOffline) {
20,832✔
1491
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1492
      break;
×
1493
    }
1494
  }
1495

1496
  if (vnodeOffline && !force) {
9,142✔
1497
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1498
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1499
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1500
    goto _OVER;
×
1501
  }
1502

1503
  if (!isonline && !force) {
9,142✔
UNCOV
1504
    code = TSDB_CODE_DNODE_OFFLINE;
×
UNCOV
1505
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
×
1506
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
UNCOV
1507
    goto _OVER;
×
1508
  }
1509

1510
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
9,142✔
1511
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
9,142✔
1512

1513
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
9,142✔
1514
    char obj1[30] = {0};
9,142✔
1515
    (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
9,142✔
1516

1517
    int64_t tse = taosGetTimestampMs();
9,142✔
1518
    double  duration = (double)(tse - tss);
9,142✔
1519
    duration = duration / 1000;
9,142✔
1520
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
9,142✔
1521
  }
1522

1523
_OVER:
9,762✔
1524
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
9,762✔
1525
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
1,367✔
1526
  }
1527

1528
  mndReleaseDnode(pMnode, pDnode);
9,762✔
1529
  mndReleaseMnode(pMnode, pMObj);
9,762✔
1530
  mndReleaseQnode(pMnode, pQObj);
9,762✔
1531
  mndReleaseBnode(pMnode, pBObj);
9,762✔
1532
  mndReleaseSnode(pMnode, pSObj);
9,762✔
1533
  tFreeSDropDnodeReq(&dropReq);
9,762✔
1534
  TAOS_RETURN(code);
9,762✔
1535
}
1536

1537
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
165✔
1538
  int32_t code = 0;
165✔
1539
  SMnode *pMnode = pReq->info.node;
165✔
1540
  SSdb   *pSdb = pMnode->pSdb;
165✔
1541
  void   *pIter = NULL;
165✔
1542
  int8_t  encrypting = 0;
165✔
1543

1544
  const STraceId *trace = &pReq->info.traceId;
165✔
1545

1546
  int32_t klen = strlen(pDcfgReq->value);
165✔
1547
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
165✔
1548
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1549
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1550
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1551
    goto _exit;
×
1552
  }
1553

1554
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
165✔
1555
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1556
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1557
    goto _exit;
×
1558
  }
1559

1560
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
165✔
1561
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1562
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1563
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1564
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1565
    goto _exit;
×
1566
  }
1567

1568
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
165✔
1569
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
165✔
1570
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
165✔
1571

1572
  while (1) {
165✔
1573
    SDnodeObj *pDnode = NULL;
330✔
1574
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
330✔
1575
    if (pIter == NULL) break;
330✔
1576
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
165✔
1577
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1578
             offlineReason[pDnode->offlineReason]);
1579
      sdbRelease(pSdb, pDnode);
×
1580
      continue;
×
1581
    }
1582

1583
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
165✔
1584
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
165✔
1585
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
165✔
1586
      void   *pBuf = rpcMallocCont(bufLen);
165✔
1587

1588
      if (pBuf != NULL) {
165✔
1589
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
165✔
1590
          code = bufLen;
×
1591
          sdbRelease(pSdb, pDnode);
×
1592
          goto _exit;
×
1593
        }
1594
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
165✔
1595
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
165✔
1596
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
165✔
1597
        }
1598
      }
1599
    }
1600

1601
    sdbRelease(pSdb, pDnode);
165✔
1602
  }
1603

1604
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
165✔
1605
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1606
  }
1607

1608
_exit:
165✔
1609
  if (code != 0) {
165✔
1610
    if (terrno == 0) terrno = code;
×
1611
  }
1612
  return code;
165✔
1613
}
1614

1615
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
165✔
1616
  int32_t code = 0;
165✔
1617

1618
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1619
  SMnode       *pMnode = pReq->info.node;
165✔
1620
  SMCfgDnodeReq cfgReq = {0};
165✔
1621
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
165✔
1622

1623
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE)) != 0) {
165✔
1624
    tFreeSMCfgDnodeReq(&cfgReq);
×
1625
    TAOS_RETURN(code);
×
1626
  }
1627
  const STraceId *trace = &pReq->info.traceId;
165✔
1628
  SDCfgDnodeReq   dcfgReq = {0};
165✔
1629
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
165✔
1630
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
165✔
1631
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
165✔
1632
    tFreeSMCfgDnodeReq(&cfgReq);
165✔
1633
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
165✔
1634
  } else {
1635
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
1636
    tFreeSMCfgDnodeReq(&cfgReq);
×
1637
    TAOS_RETURN(code);
×
1638
  }
1639

1640
#else
1641
  TAOS_RETURN(code);
1642
#endif
1643
}
1644

1645
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
165✔
1646
  SMnode *pMnode = pRsp->info.node;
165✔
1647
  int16_t nSuccess = 0;
165✔
1648
  int16_t nFailed = 0;
165✔
1649

1650
  if (0 == pRsp->code) {
165✔
1651
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
165✔
1652
  } else {
1653
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1654
  }
1655

1656
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
165✔
1657
  bool    finished = nSuccess + nFailed >= nReq;
165✔
1658

1659
  if (finished) {
165✔
1660
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
165✔
1661
  }
1662

1663
  const STraceId *trace = &pRsp->info.traceId;
165✔
1664
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
165✔
1665
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1666

1667
  return 0;
165✔
1668
}
1669

1670
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,148✔
1671
  SMnode *pMnode = pReq->info.node;
1,148✔
1672
  int32_t totalRows = 0;
1,148✔
1673
  int32_t numOfRows = 0;
1,148✔
1674
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,148✔
1675
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,148✔
1676
  char   *pWrite = NULL;
1,148✔
1677
  int32_t cols = 0;
1,148✔
1678
  int32_t code = 0;
1,148✔
1679
  int32_t lino = 0;
1,148✔
1680

1681
  cfgOpts[totalRows] = "statusIntervalMs";
1,148✔
1682
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
1,148✔
1683
  totalRows++;
1,148✔
1684

1685
  cfgOpts[totalRows] = "timezone";
1,148✔
1686
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,148✔
1687
  totalRows++;
1,148✔
1688

1689
  cfgOpts[totalRows] = "locale";
1,148✔
1690
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,148✔
1691
  totalRows++;
1,148✔
1692

1693
  cfgOpts[totalRows] = "charset";
1,148✔
1694
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,148✔
1695
  totalRows++;
1,148✔
1696

1697
  cfgOpts[totalRows] = "monitor";
1,148✔
1698
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,148✔
1699
  totalRows++;
1,148✔
1700

1701
  cfgOpts[totalRows] = "monitorInterval";
1,148✔
1702
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,148✔
1703
  totalRows++;
1,148✔
1704

1705
  cfgOpts[totalRows] = "slowLogThreshold";
1,148✔
1706
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,148✔
1707
  totalRows++;
1,148✔
1708

1709
  cfgOpts[totalRows] = "slowLogMaxLen";
1,148✔
1710
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,148✔
1711
  totalRows++;
1,148✔
1712

1713
  char scopeStr[64] = {0};
1,148✔
1714
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,148✔
1715
  cfgOpts[totalRows] = "slowLogScope";
1,148✔
1716
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,148✔
1717
  totalRows++;
1,148✔
1718

1719
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,148✔
1720
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,148✔
1721

1722
  for (int32_t i = 0; i < totalRows; i++) {
11,480✔
1723
    cols = 0;
10,332✔
1724

1725
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
10,332✔
1726
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,332✔
1727
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
10,332✔
1728

1729
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
10,332✔
1730
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,332✔
1731
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
10,332✔
1732

1733
    numOfRows++;
10,332✔
1734
  }
1735

1736
_OVER:
1,148✔
1737
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,148✔
1738
  pShow->numOfRows += numOfRows;
1,148✔
1739
  return numOfRows;
1,148✔
1740
}
1741

1742
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1743

1744
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
705,487✔
1745
  SMnode    *pMnode = pReq->info.node;
705,487✔
1746
  SSdb      *pSdb = pMnode->pSdb;
705,487✔
1747
  int32_t    numOfRows = 0;
705,487✔
1748
  int32_t    cols = 0;
705,487✔
1749
  ESdbStatus objStatus = 0;
705,487✔
1750
  SDnodeObj *pDnode = NULL;
705,487✔
1751
  int64_t    curMs = taosGetTimestampMs();
705,487✔
1752
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
705,327✔
1753
  int32_t    code = 0;
705,487✔
1754
  int32_t    lino = 0;
705,487✔
1755

1756
  while (numOfRows < rows) {
2,525,026✔
1757
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
2,525,026✔
1758
    if (pShow->pIter == NULL) break;
2,525,026✔
1759
    bool online = mndIsDnodeOnline(pDnode, curMs);
1,819,539✔
1760

1761
    cols = 0;
1,819,539✔
1762

1763
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,819,539✔
1764
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
1,819,539✔
1765

1766
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
1,819,539✔
1767

1768
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,819,539✔
1769
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,819,539✔
1770

1771
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,819,539✔
1772
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
1,819,539✔
1773
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
1,819,539✔
1774

1775
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,819,539✔
1776
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
1,819,539✔
1777
                        &lino, _OVER);
1778

1779
    const char *status = "ready";
1,819,539✔
1780
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
1,819,539✔
1781
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
1,819,539✔
1782
    if (!online) {
1,819,539✔
1783
      if (objStatus == SDB_STATUS_CREATING)
198,100✔
1784
        status = "creating*";
×
1785
      else if (objStatus == SDB_STATUS_DROPPING)
198,100✔
1786
        status = "dropping*";
×
1787
      else
1788
        status = "offline";
198,100✔
1789
    }
1790

1791
    STR_TO_VARSTR(buf, status);
1,819,539✔
1792
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,819,539✔
1793
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,819,539✔
1794

1795
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,819,539✔
1796
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
1,819,539✔
1797
                        _OVER);
1798

1799
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,819,539✔
1800
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
1,819,539✔
1801
                        _OVER);
1802

1803
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
1,819,539✔
1804
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
1,819,539✔
1805

1806
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,819,539✔
1807
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
1,819,539✔
1808
    taosMemoryFreeClear(b);
1,819,539✔
1809

1810
#ifdef TD_ENTERPRISE
1811
    STR_TO_VARSTR(buf, pDnode->machineId);
1,819,539✔
1812
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,819,539✔
1813
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,819,539✔
1814
#endif
1815

1816
    numOfRows++;
1,819,539✔
1817
    sdbRelease(pSdb, pDnode);
1,819,539✔
1818
  }
1819

1820
_OVER:
705,327✔
1821
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
705,487✔
1822

1823
  pShow->numOfRows += numOfRows;
705,487✔
1824
  return numOfRows;
705,487✔
1825
}
1826

1827
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1828
  SSdb *pSdb = pMnode->pSdb;
×
1829
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1830
}
×
1831

1832
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1833
  int32_t    code = 0;
×
1834
  SDnodeObj *pObj = NULL;
×
1835
  void      *pIter = NULL;
×
1836
  SSdb      *pSdb = pMnode->pSdb;
×
1837
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1838
  if (fqdns == NULL) {
×
1839
    mError("failed to init fqdns array");
×
1840
    return NULL;
×
1841
  }
1842

1843
  while (1) {
×
1844
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1845
    if (pIter == NULL) break;
×
1846

1847
    char *fqdn = taosStrdup(pObj->fqdn);
×
1848
    if (fqdn == NULL) {
×
1849
      sdbRelease(pSdb, pObj);
×
1850
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
1851

1852
      code = terrno;
×
1853
      break;
×
1854
    }
1855

1856
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1857
      mError("failed to fqdn into array, but continue at this time");
×
1858
    }
1859
    sdbRelease(pSdb, pObj);
×
1860
  }
1861

1862
_error:
×
1863
  if (code != 0) {
×
1864
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
1865
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
1866
      taosMemoryFreeClear(pFqdn);
×
1867
    }
1868
    taosArrayDestroy(fqdns);
×
1869
    fqdns = NULL;
×
1870
  }
1871

1872
  return fqdns;
×
1873
}
1874

1875
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq) {
542,152✔
1876
  SMnode     *pMnode = pReq->info.node;
542,152✔
1877
  SKeySyncReq req = {0};
542,152✔
1878
  SKeySyncRsp rsp = {0};
542,152✔
1879
  int32_t     code = TSDB_CODE_SUCCESS;
542,152✔
1880

1881
  code = tDeserializeSKeySyncReq(pReq->pCont, pReq->contLen, &req);
542,152✔
1882
  if (code != 0) {
542,152✔
1883
    mError("failed to deserialize key sync req, since %s", tstrerror(code));
×
1884
    goto _OVER;
×
1885
  }
1886

1887
  mInfo("received key sync req from dnode:%d, keyVersion:%d", req.dnodeId, req.keyVersion);
542,152✔
1888

1889
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1890
  // Load mnode's encryption keys
1891
  char masterKeyFile[PATH_MAX] = {0};
542,152✔
1892
  snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
542,152✔
1893
           TD_DIRSEP);
1894
  char derivedKeyFile[PATH_MAX] = {0};
542,152✔
1895
  snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
542,152✔
1896
           TD_DIRSEP);
1897
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
542,152✔
1898
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
542,152✔
1899
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
542,152✔
1900
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
542,152✔
1901
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
542,152✔
1902
  int32_t algorithm = 0;
542,152✔
1903
  int32_t cfgAlgorithm = 0;
542,152✔
1904
  int32_t metaAlgorithm = 0;
542,152✔
1905
  int32_t fileVersion = 0;
542,152✔
1906
  int32_t keyVersion = 0;
542,152✔
1907
  int64_t createTime = 0;
542,152✔
1908
  int64_t svrKeyUpdateTime = 0;
542,152✔
1909
  int64_t dbKeyUpdateTime = 0;
542,152✔
1910

1911
  if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
542,152✔
1912
    keyVersion = tsEncryptKeyVersion;
1,470✔
1913
    tstrncpy(svrKey, tsSvrKey, ENCRYPT_KEY_LEN + 1);
1,470✔
1914
    tstrncpy(dbKey, tsDbKey, ENCRYPT_KEY_LEN + 1);
1,470✔
1915
    tstrncpy(cfgKey, tsCfgKey, ENCRYPT_KEY_LEN + 1);
1,470✔
1916
    tstrncpy(metaKey, tsMetaKey, ENCRYPT_KEY_LEN + 1);
1,470✔
1917
    tstrncpy(dataKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
1,470✔
1918
    algorithm = tsEncryptAlgorithmType;
1,470✔
1919
    cfgAlgorithm = tsCfgAlgorithm;
1,470✔
1920
    metaAlgorithm = tsMetaAlgorithm;
1,470✔
1921
    fileVersion = tsEncryptFileVersion;
1,470✔
1922
    createTime = tsEncryptKeyCreateTime;
1,470✔
1923
    svrKeyUpdateTime = tsSvrKeyUpdateTime;
1,470✔
1924
    dbKeyUpdateTime = tsDbKeyUpdateTime;
1,470✔
1925
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_LOADED;
1,470✔
1926
  } else {
1927
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_DISABLED;
540,682✔
1928
  }
1929

1930
  // Check if dnode needs update
1931
  if (req.keyVersion != keyVersion) {
542,152✔
1932
    mInfo("dnode:%d key version mismatch, mnode:%d, dnode:%d, will send keys", req.dnodeId, keyVersion, req.keyVersion);
1,470✔
1933

1934
    rsp.keyVersion = keyVersion;
1,470✔
1935
    rsp.needUpdate = 1;
1,470✔
1936
    tstrncpy(rsp.svrKey, svrKey, sizeof(rsp.svrKey));
1,470✔
1937
    tstrncpy(rsp.dbKey, dbKey, sizeof(rsp.dbKey));
1,470✔
1938
    tstrncpy(rsp.cfgKey, cfgKey, sizeof(rsp.cfgKey));
1,470✔
1939
    tstrncpy(rsp.metaKey, metaKey, sizeof(rsp.metaKey));
1,470✔
1940
    tstrncpy(rsp.dataKey, dataKey, sizeof(rsp.dataKey));
1,470✔
1941
    rsp.algorithm = algorithm;
1,470✔
1942
    rsp.createTime = createTime;
1,470✔
1943
    rsp.svrKeyUpdateTime = svrKeyUpdateTime;
1,470✔
1944
    rsp.dbKeyUpdateTime = dbKeyUpdateTime;
1,470✔
1945
  } else {
1946
    mInfo("dnode:%d key version matches, version:%d", req.dnodeId, keyVersion);
540,682✔
1947
    rsp.keyVersion = keyVersion;
540,682✔
1948
    rsp.needUpdate = 0;
540,682✔
1949
  }
1950
#else
1951
  // Community edition - no encryption support
1952
  mWarn("enterprise features not enabled, key sync not supported");
1953
  rsp.keyVersion = 0;
1954
  rsp.needUpdate = 0;
1955
#endif
1956

1957
  int32_t contLen = tSerializeSKeySyncRsp(NULL, 0, &rsp);
542,152✔
1958
  if (contLen < 0) {
542,152✔
1959
    code = contLen;
×
1960
    goto _OVER;
×
1961
  }
1962

1963
  void *pHead = rpcMallocCont(contLen);
542,152✔
1964
  if (pHead == NULL) {
542,152✔
1965
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1966
    goto _OVER;
×
1967
  }
1968

1969
  contLen = tSerializeSKeySyncRsp(pHead, contLen, &rsp);
542,152✔
1970
  if (contLen < 0) {
542,152✔
1971
    rpcFreeCont(pHead);
×
1972
    code = contLen;
×
1973
    goto _OVER;
×
1974
  }
1975

1976
  pReq->info.rspLen = contLen;
542,152✔
1977
  pReq->info.rsp = pHead;
542,152✔
1978

1979
_OVER:
542,152✔
1980
  if (code != 0) {
542,152✔
1981
    mError("failed to process key sync req, since %s", tstrerror(code));
×
1982
  }
1983
  return code;
542,152✔
1984
}
1985

1986
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq) { return 0; }
×
1987

1988
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
1989
  if (p == NULL) return NULL;
×
1990

1991
  switch (type) {
×
1992
    case SDB_DNODE:
×
1993
      return (SDnodeObj *)p;
×
1994
    case SDB_QNODE:
×
1995
      return ((SQnodeObj *)p)->pDnode;
×
1996
    case SDB_SNODE:
×
1997
      return ((SSnodeObj *)p)->pDnode;
×
1998
    case SDB_BNODE:
×
1999
      return ((SBnodeObj *)p)->pDnode;
×
2000
    default:
×
2001
      break;
×
2002
  }
2003
  return NULL;
×
2004
}
2005
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
2006
  int32_t lino = 0;
×
2007
  SSdb   *pSdb = pMnode->pSdb;
×
2008
  void   *pIter = NULL;
×
2009
  int32_t code = 0;
×
2010

2011
  while (1) {
×
2012
    void *pObj = NULL;
×
2013
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
2014
    if (pIter == NULL) break;
×
2015

2016
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
2017
    if (pDnodeObj == NULL) {
×
2018
      mError("null dnode object for type:%d", type);
×
2019
      sdbRelease(pSdb, pObj);
×
2020
      continue;
×
2021
    }
2022

2023
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
2024
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
2025
      mError("failed to push addr into array");
×
2026
      sdbRelease(pSdb, pObj);
×
2027
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2028
    }
2029
    sdbRelease(pSdb, pObj);
×
2030
  }
2031

2032
_exit:
×
2033
  return code;
×
2034
}
2035

2036
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
2037
  int32_t lino = 0;
×
2038
  int32_t code = 0;
×
2039
  if (pMnode == NULL || pAddr == NULL) {
×
2040
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
2041
  }
2042

2043
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
2044
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2045

2046
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
2047
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2048

2049
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
2050
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2051

2052
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
2053
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2054

2055
_error:
×
2056
  return code;
×
2057
}
2058

2059
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
2060
  int32_t code = 0;
×
2061

2062
  SMnode *pMnode = pReq->info.node;
×
2063
  void   *pIter = NULL;
×
2064
  SSdb   *pSdb = pMnode->pSdb;
×
2065
  mInfo("start to reload dnode tls config");
×
2066

2067
  SMCfgDnodeReq req = {0};
×
2068
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
2069
    goto _OVER;
×
2070
  }
2071

2072
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
2073
    goto _OVER;
×
2074
  }
2075

2076
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
2077
  if (pAddr == NULL) {
×
2078
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
2079
  }
2080

2081
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
2082

2083
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
2084
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
2085
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
2086
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
2087
    if (code != 0) {
×
2088
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
2089
    }
2090
  }
2091

2092
_OVER:
×
2093
  tFreeSMCfgDnodeReq(&req);
×
2094
  taosArrayDestroy(pAddr);
×
2095
  return code;
×
2096
}
2097

2098
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
2099
  int32_t code = 0;
×
2100
  if (pRsp->code != 0) {
×
2101
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
2102
  } else {
2103
    mInfo("succeed to reload dnode tls config");
×
2104
  }
2105
  return code;
×
2106
}
2107

2108
static int32_t mndProcessAlterEncryptKeyReqImpl(SRpcMsg *pReq, SMAlterEncryptKeyReq *pAlterReq) {
×
2109
  int32_t code = 0;
×
2110
  SMnode *pMnode = pReq->info.node;
×
2111
  SSdb   *pSdb = pMnode->pSdb;
×
2112
  void   *pIter = NULL;
×
2113

2114
  const STraceId *trace = &pReq->info.traceId;
×
2115

2116
  // Validate key type
2117
  if (pAlterReq->keyType != 0 && pAlterReq->keyType != 1) {
×
2118
    mGError("msg:%p, failed to alter encrypt key since invalid key type:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", pReq,
×
2119
            pAlterReq->keyType);
2120
    return TSDB_CODE_INVALID_PARA;
×
2121
  }
2122

2123
  // Validate new key length
2124
  int32_t klen = strlen(pAlterReq->newKey);
×
2125
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
2126
    mGError("msg:%p, failed to alter encrypt key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
2127
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
2128
    return TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
2129
  }
2130

2131
  // Prepare SMAlterEncryptKeyReq for distribution to dnodes
2132
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
2133
  alterKeyReq.keyType = pAlterReq->keyType;
×
2134
  tstrncpy(alterKeyReq.newKey, pAlterReq->newKey, sizeof(alterKeyReq.newKey));
×
2135
  alterKeyReq.sqlLen = 0;
×
2136
  alterKeyReq.sql = NULL;
×
2137

2138
  // Send request to all online dnodes
2139
  while (1) {
×
2140
    SDnodeObj *pDnode = NULL;
×
2141
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2142
    if (pIter == NULL) break;
×
2143

2144
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
2145
      mGWarn("msg:%p, don't send alter encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2146
             offlineReason[pDnode->offlineReason]);
2147
      sdbRelease(pSdb, pDnode);
×
2148
      continue;
×
2149
    }
2150

2151
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2152
    int32_t bufLen = tSerializeSMAlterEncryptKeyReq(NULL, 0, &alterKeyReq);
×
2153
    void   *pBuf = rpcMallocCont(bufLen);
×
2154

2155
    if (pBuf != NULL) {
×
2156
      if ((bufLen = tSerializeSMAlterEncryptKeyReq(pBuf, bufLen, &alterKeyReq)) <= 0) {
×
2157
        code = bufLen;
×
2158
        sdbRelease(pSdb, pDnode);
×
2159
        goto _exit;
×
2160
      }
2161
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
2162
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2163
      if (ret != 0) {
×
2164
        mGError("msg:%p, failed to send alter encrypt_key req to dnode:%d, error:%s", pReq, pDnode->id, tstrerror(ret));
×
2165
      } else {
2166
        mGInfo("msg:%p, send alter encrypt_key req to dnode:%d, keyType:%d", pReq, pDnode->id, pAlterReq->keyType);
×
2167
      }
2168
    }
2169

2170
    sdbRelease(pSdb, pDnode);
×
2171
  }
2172

2173
  // Note: mnode runs on dnode, so the local keys will be updated by dnode itself
2174
  // when it receives the alter encrypt key request from mnode
2175
  mGInfo("msg:%p, successfully sent alter encrypt key request to all dnodes, keyType:%d", pReq, pAlterReq->keyType);
×
2176

2177
_exit:
×
2178
  if (code != 0) {
×
2179
    if (terrno == 0) terrno = code;
×
2180
  }
2181
  return code;
×
2182
}
2183

2184
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq) {
×
2185
  SMnode              *pMnode = pReq->info.node;
×
2186
  SMAlterEncryptKeyReq alterReq = {0};
×
2187
  int32_t              code = TSDB_CODE_SUCCESS;
×
2188
  int32_t              lino = 0;
×
2189

2190
  // Check privilege - only admin can alter encryption keys
2191
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2192
                  &lino, _OVER);
2193

2194
  // Deserialize request
2195
  code = tDeserializeSMAlterEncryptKeyReq(pReq->pCont, pReq->contLen, &alterReq);
×
2196
  if (code != 0) {
×
2197
    mError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
2198
    goto _OVER;
×
2199
  }
2200

2201
  mInfo("received alter encrypt key req, keyType:%d", alterReq.keyType);
×
2202

2203
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2204
  // Process and distribute to all dnodes
2205
  code = mndProcessAlterEncryptKeyReqImpl(pReq, &alterReq);
×
2206
  if (code == 0) {
×
2207
    // Audit log
2208
    auditRecord(pReq, pMnode->clusterId, "alterEncryptKey", "", alterReq.keyType == 0 ? "SVR_KEY" : "DB_KEY",
×
2209
                alterReq.sql, alterReq.sqlLen, 0, 0);
2210
  }
2211
#else
2212
  // Community edition - no encryption support
2213
  mError("encryption key management is only available in enterprise edition");
2214
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2215
#endif
2216

2217
_OVER:
×
2218
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2219
    mError("failed to alter encrypt key, keyType:%d, since %s", alterReq.keyType, tstrerror(code));
×
2220
  }
2221

2222
  tFreeSMAlterEncryptKeyReq(&alterReq);
×
2223
  TAOS_RETURN(code);
×
2224
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc