• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4943

30 Jan 2026 06:19AM UTC coverage: 66.718% (-0.07%) from 66.788%
#4943

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1122 of 2018 new or added lines in 72 files covered. (55.6%)

823 existing lines in 156 files now uncovered.

204811 of 306978 relevant lines covered (66.72%)

123993567.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.86
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndToken.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "taos_monitor.h"
34
#include "tconfig.h"
35
#include "tjson.h"
36
#include "tmisce.h"
37
#include "tunit.h"
38
#if defined(TD_ENTERPRISE)
39
#include "taoskInt.h"
40
#endif
41

42
#define TSDB_DNODE_VER_NUMBER   2
43
#define TSDB_DNODE_RESERVE_SIZE 40
44

45
static const char *offlineReason[] = {
46
    "",
47
    "status msg timeout",
48
    "status not received",
49
    "version not match",
50
    "dnodeId not match",
51
    "clusterId not match",
52
    "statusInterval not match",
53
    "timezone not match",
54
    "locale not match",
55
    "charset not match",
56
    "ttlChangeOnWrite not match",
57
    "enableWhiteList not match",
58
    "encryptionKey not match",
59
    "monitor not match",
60
    "monitor switch not match",
61
    "monitor interval not match",
62
    "monitor slow log threshold not match",
63
    "monitor slow log sql max len not match",
64
    "monitor slow log scope not match",
65
    "unknown",
66
};
67

68
enum {
69
  DND_ACTIVE_CODE,
70
  DND_CONN_ACTIVE_CODE,
71
};
72

73
enum {
74
  DND_CREATE,
75
  DND_ADD,
76
  DND_DROP,
77
};
78

79
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
80
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
81
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
82
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
83
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
84
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
85
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
86

87
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
89
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
90
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
91
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
92
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
93
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
94
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
95
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
96
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
97
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
98

99
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
100
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
101
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
102
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
103

104
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq);
105
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq);
106
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
107
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
108
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq);
109
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq);
110

111
#ifdef _GRANT
112
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
113
#else
114
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
115
#endif
116

117
int32_t mndInitDnode(SMnode *pMnode) {
402,084✔
118
  SSdbTable table = {
402,084✔
119
      .sdbType = SDB_DNODE,
120
      .keyType = SDB_KEY_INT32,
121
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
122
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
123
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
124
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
125
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
126
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
127
  };
128

129
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
402,084✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
402,084✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
402,084✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
402,084✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
402,084✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
402,084✔
135
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
402,084✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
402,084✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
402,084✔
138
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
402,084✔
139
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
402,084✔
140
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
402,084✔
141
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
402,084✔
142
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
402,084✔
143

144
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
402,084✔
145
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
402,084✔
146
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
402,084✔
147
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
402,084✔
148

149
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC, mndProcessKeySyncReq);
402,084✔
150
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC_RSP, mndProcessKeySyncRsp);
402,084✔
151
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_ENCRYPT_KEY, mndProcessAlterEncryptKeyReq);
402,084✔
152
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_KEY_EXPIRATION, mndProcessAlterKeyExpirationReq);
402,084✔
153

154
  return sdbSetTable(pMnode->pSdb, table);
402,084✔
155
}
156

157
void mndCleanupDnode(SMnode *pMnode) {}
402,026✔
158

159
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
286,120✔
160
  int32_t  code = -1;
286,120✔
161
  SSdbRaw *pRaw = NULL;
286,120✔
162
  STrans  *pTrans = NULL;
286,120✔
163

164
  SDnodeObj dnodeObj = {0};
286,120✔
165
  dnodeObj.id = 1;
286,120✔
166
  dnodeObj.createdTime = taosGetTimestampMs();
286,120✔
167
  dnodeObj.updateTime = dnodeObj.createdTime;
286,120✔
168
  dnodeObj.port = tsServerPort;
286,120✔
169
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
286,120✔
170
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
286,120✔
171
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
286,120✔
172
  char *machineId = NULL;
286,120✔
173
  code = tGetMachineId(&machineId);
286,120✔
174
  if (machineId) {
286,120✔
175
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
286,120✔
176
    taosMemoryFreeClear(machineId);
286,120✔
177
  } else {
178
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
179
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
180
    goto _OVER;
×
181
#endif
182
  }
183

184
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
286,120✔
185
  if (pTrans == NULL) {
286,120✔
186
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
187
    if (terrno != 0) code = terrno;
×
188
    goto _OVER;
×
189
  }
190
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
286,120✔
191

192
  pRaw = mndDnodeActionEncode(&dnodeObj);
286,120✔
193
  if (pRaw == NULL) {
286,120✔
194
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
195
    if (terrno != 0) code = terrno;
×
196
    goto _OVER;
×
197
  }
198
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
286,120✔
199
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
286,120✔
200
  pRaw = NULL;
286,120✔
201

202
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
286,120✔
203
  code = 0;
286,120✔
204

205
_OVER:
286,120✔
206
  mndTransDrop(pTrans);
286,120✔
207
  sdbFreeRaw(pRaw);
286,120✔
208
  return code;
286,120✔
209
}
210

211
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,397,438✔
212
  int32_t code = 0;
2,397,438✔
213
  int32_t lino = 0;
2,397,438✔
214
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,397,438✔
215

216
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,397,438✔
217
  if (pRaw == NULL) goto _OVER;
2,397,438✔
218

219
  int32_t dataPos = 0;
2,397,438✔
220
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,397,438✔
221
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,397,438✔
222
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,397,438✔
223
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,397,438✔
224
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,397,438✔
225
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,397,438✔
226
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,397,438✔
227
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,397,438✔
228
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,397,438✔
229
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,397,438✔
230

231
  terrno = 0;
2,397,438✔
232

233
_OVER:
2,397,438✔
234
  if (terrno != 0) {
2,397,438✔
235
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
236
    sdbFreeRaw(pRaw);
×
237
    return NULL;
×
238
  }
239

240
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,397,438✔
241
  return pRaw;
2,397,438✔
242
}
243

244
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
1,682,804✔
245
  int32_t code = 0;
1,682,804✔
246
  int32_t lino = 0;
1,682,804✔
247
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,682,804✔
248
  SSdbRow   *pRow = NULL;
1,682,804✔
249
  SDnodeObj *pDnode = NULL;
1,682,804✔
250

251
  int8_t sver = 0;
1,682,804✔
252
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,682,804✔
253
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
1,682,804✔
254
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
255
    goto _OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SDnodeObj));
1,682,804✔
259
  if (pRow == NULL) goto _OVER;
1,682,804✔
260

261
  pDnode = sdbGetRowObj(pRow);
1,682,804✔
262
  if (pDnode == NULL) goto _OVER;
1,682,804✔
263

264
  int32_t dataPos = 0;
1,682,804✔
265
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
1,682,804✔
266
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
1,682,804✔
267
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
1,682,804✔
268
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
1,682,804✔
269
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
1,682,804✔
270
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
1,682,804✔
271
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
1,682,804✔
272
  if (sver > 1) {
1,682,804✔
273
    int16_t keyLen = 0;
1,682,804✔
274
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,682,804✔
275
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,682,804✔
276
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,682,804✔
277
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,682,804✔
278
  }
279

280
  terrno = 0;
1,682,804✔
281
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
1,682,804✔
282
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
283
  }
284

285
_OVER:
1,682,804✔
286
  if (terrno != 0) {
1,682,804✔
287
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
1,682,804✔
293
  return pRow;
1,682,804✔
294
}
295

296
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
719,822✔
297
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
719,822✔
298
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
719,822✔
299

300
  char ep[TSDB_EP_LEN] = {0};
719,822✔
301
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
719,822✔
302
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
719,822✔
303
  return 0;
719,822✔
304
}
305

306
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
1,682,761✔
307
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
1,682,761✔
308
  return 0;
1,682,761✔
309
}
310

311
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
954,227✔
312
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
954,227✔
313
  pOld->updateTime = pNew->updateTime;
954,227✔
314
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
315
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
954,227✔
316
#endif
317
  return 0;
954,227✔
318
}
319

320
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
168,086,810✔
321
  SSdb      *pSdb = pMnode->pSdb;
168,086,810✔
322
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
168,086,810✔
323
  if (pDnode == NULL) {
168,086,894✔
324
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
237,514✔
325
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
40,638✔
326
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
196,876✔
327
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
328
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
196,876✔
329
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
196,876✔
330
    } else {
331
      terrno = TSDB_CODE_APP_ERROR;
×
332
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
333
    }
334
  }
335

336
  return pDnode;
168,086,894✔
337
}
338

339
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
169,331,811✔
340
  SSdb *pSdb = pMnode->pSdb;
169,331,811✔
341
  sdbRelease(pSdb, pDnode);
169,331,811✔
342
}
169,331,515✔
343

344
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
9,050,353✔
345
  SEpSet epSet = {0};
9,050,353✔
346
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
9,050,353✔
347
  return epSet;
9,050,353✔
348
}
349

350
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
821,824✔
351
  SEpSet     epSet = {0};
821,824✔
352
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
821,824✔
353
  if (!pDnode) return epSet;
821,824✔
354

355
  epSet = mndGetDnodeEpset(pDnode);
821,824✔
356

357
  mndReleaseDnode(pMnode, pDnode);
821,824✔
358
  return epSet;
821,824✔
359
}
360

361
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,322,351✔
362
  SSdb *pSdb = pMnode->pSdb;
1,322,351✔
363

364
  void *pIter = NULL;
1,322,351✔
365
  while (1) {
2,290,753✔
366
    SDnodeObj *pDnode = NULL;
3,613,104✔
367
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
3,613,104✔
368
    if (pIter == NULL) break;
3,613,104✔
369

370
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
2,869,021✔
371
      sdbCancelFetch(pSdb, pIter);
578,268✔
372
      return pDnode;
578,268✔
373
    }
374

375
    sdbRelease(pSdb, pDnode);
2,290,753✔
376
  }
377

378
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
744,083✔
379
  return NULL;
744,083✔
380
}
381

382
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
149,257✔
383
  SSdb *pSdb = pMnode->pSdb;
149,257✔
384

385
  void *pIter = NULL;
149,257✔
386
  while (1) {
158,014✔
387
    SDnodeObj *pDnode = NULL;
307,271✔
388
    ESdbStatus objStatus = 0;
307,271✔
389
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
307,271✔
390
    if (pIter == NULL) break;
307,271✔
391

392
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
307,271✔
393
      sdbCancelFetch(pSdb, pIter);
149,257✔
394
      return pDnode;
149,257✔
395
    }
396

397
    sdbRelease(pSdb, pDnode);
158,014✔
398
  }
399

400
  return NULL;
×
401
}
402

403
int32_t mndGetDnodeSize(SMnode *pMnode) {
91,532,046✔
404
  SSdb *pSdb = pMnode->pSdb;
91,532,046✔
405
  return sdbGetSize(pSdb, SDB_DNODE);
91,532,314✔
406
}
407

408
int32_t mndGetDbSize(SMnode *pMnode) {
×
409
  SSdb *pSdb = pMnode->pSdb;
×
410
  return sdbGetSize(pSdb, SDB_DB);
×
411
}
412

413
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
113,989,778✔
414
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
113,989,778✔
415
  if (interval > (int64_t)tsStatusTimeoutMs) {
113,989,111✔
416
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,355,338✔
417
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
41,008✔
418
    }
419
    return false;
2,355,338✔
420
  }
421
  return true;
111,633,773✔
422
}
423

424
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,167,364✔
425
  SSdb *pSdb = pMnode->pSdb;
2,167,364✔
426

427
  int32_t numOfEps = 0;
2,167,364✔
428
  void   *pIter = NULL;
2,167,364✔
429
  while (1) {
6,763,405✔
430
    SDnodeObj *pDnode = NULL;
8,930,769✔
431
    ESdbStatus objStatus = 0;
8,930,769✔
432
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
8,930,769✔
433
    if (pIter == NULL) break;
8,930,769✔
434

435
    SDnodeEp dnodeEp = {0};
6,763,405✔
436
    dnodeEp.id = pDnode->id;
6,763,405✔
437
    dnodeEp.ep.port = pDnode->port;
6,763,405✔
438
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
6,763,405✔
439
    sdbRelease(pSdb, pDnode);
6,763,405✔
440

441
    dnodeEp.isMnode = 0;
6,763,405✔
442
    if (mndIsMnode(pMnode, pDnode->id)) {
6,763,405✔
443
      dnodeEp.isMnode = 1;
2,678,928✔
444
    }
445
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
6,763,405✔
446
      mError("failed to put ep into array, but continue at this call");
×
447
    }
448
  }
449
}
2,167,364✔
450

451
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
27,944,295✔
452
  SSdb   *pSdb = pMnode->pSdb;
27,944,295✔
453
  int32_t code = 0;
27,944,295✔
454

455
  int32_t numOfEps = 0;
27,944,295✔
456
  void   *pIter = NULL;
27,944,295✔
457
  while (1) {
113,931,237✔
458
    SDnodeObj *pDnode = NULL;
141,875,532✔
459
    ESdbStatus objStatus = 0;
141,875,532✔
460
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
141,875,532✔
461
    if (pIter == NULL) break;
141,875,532✔
462

463
    SDnodeInfo dInfo;
113,930,867✔
464
    dInfo.id = pDnode->id;
113,931,237✔
465
    dInfo.ep.port = pDnode->port;
113,931,237✔
466
    dInfo.offlineReason = pDnode->offlineReason;
113,931,237✔
467
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
113,931,237✔
468
    sdbRelease(pSdb, pDnode);
113,931,237✔
469
    if (mndIsMnode(pMnode, pDnode->id)) {
113,931,237✔
470
      dInfo.isMnode = 1;
32,965,902✔
471
    } else {
472
      dInfo.isMnode = 0;
80,965,335✔
473
    }
474

475
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
113,931,237✔
476
      code = terrno;
×
477
      sdbCancelFetch(pSdb, pIter);
×
478
      break;
×
479
    }
480
  }
481
  TAOS_RETURN(code);
27,944,295✔
482
}
483

484
#define CHECK_MONITOR_PARA(para, err)                                                                    \
485
  if (pCfg->monitorParas.para != para) {                                                                 \
486
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
487
    terrno = err;                                                                                        \
488
    return err;                                                                                          \
489
  }
490

491
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
×
492
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
×
493
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
×
494
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
×
495
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
×
496
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
×
497

498
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
×
499
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
500
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
501
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
502
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
503
  }
504

505
  /*
506
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
507
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
508
           tsStatusIntervalMs);
509
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
510
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
511
  }
512
  */
513

514
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
×
515
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
516
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
517
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
518
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
519
  }
520

521
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
×
522
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
523
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
524
    return DND_REASON_LOCALE_NOT_MATCH;
×
525
  }
526

527
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
×
528
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
529
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
530
    return DND_REASON_CHARSET_NOT_MATCH;
×
531
  }
532

533
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
×
534
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
535
           tsTtlChangeOnWrite);
536
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
537
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
538
  }
539
  int8_t enable = tsEnableWhiteList ? 1 : 0;
×
540
  if (pCfg->enableWhiteList != enable) {
×
541
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
542
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
543
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
544
  }
545

546
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
×
547
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
×
548
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
549
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
550
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
551
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
552
  }
553

554
  return DND_REASON_ONLINE;
×
555
}
556

557
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
102,332✔
558
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
102,332✔
559
    return 0.0;
372✔
560
  }
561

562
  int64_t deltaCount = currentCount - lastCount;
101,960✔
563
  int64_t deltaMs = currentTimeMs - lastTimeMs;
101,960✔
564
  double  rate = (double)deltaCount / (double)deltaMs;
101,960✔
565
  return rate;
101,960✔
566
}
567

568
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
151,433,800✔
569
  bool stateChanged = false;
151,433,800✔
570
  bool roleChanged = pGid->syncState != pVload->syncState ||
151,442,744✔
571
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
298,136,705✔
572
                     pGid->roleTimeMs != pVload->roleTimeMs;
146,702,905✔
573

574
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
151,433,800✔
575
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
209,835✔
576
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
97,357✔
577
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
112,478✔
578
      int64_t currentTimeMs = taosGetTimestampMs();
102,332✔
579
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
102,332✔
580
                                          pGid->lastSyncAppliedIndexUpdateTime);
581

582
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
102,332✔
583
    }
584
  }
585

586
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
151,433,800✔
587
  pGid->syncCommitIndex = pVload->syncCommitIndex;
151,433,800✔
588
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
151,433,800✔
589
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
151,433,800✔
590
  pGid->learnerProgress = pVload->learnerProgress;
151,433,800✔
591
  pGid->snapSeq = pVload->snapSeq;
151,433,800✔
592
  pGid->syncTotalIndex = pVload->syncTotalIndex;
151,433,800✔
593
  if (pVload->snapSeq > 0 && pVload->snapSeq < SYNC_SNAPSHOT_SEQ_END || pVload->syncState == TAOS_SYNC_STATE_LEARNER) {
151,433,800✔
594
    mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64
656,805✔
595
          " , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d",
596
          vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex,
597
          pVload->syncTotalIndex, pVload->learnerProgress, pVload->snapSeq);
598
  }
599

600
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
151,433,800✔
601
      pGid->startTimeMs != pVload->startTimeMs) {
146,162,901✔
602
    mInfo(
5,270,899✔
603
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
604
        "canRead:%d, dnode:%d",
605
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
606
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
607
    pGid->syncState = pVload->syncState;
5,270,899✔
608
    pGid->syncTerm = pVload->syncTerm;
5,270,899✔
609
    pGid->syncRestore = pVload->syncRestore;
5,270,899✔
610
    pGid->syncCanRead = pVload->syncCanRead;
5,270,899✔
611
    pGid->startTimeMs = pVload->startTimeMs;
5,270,899✔
612
    pGid->roleTimeMs = pVload->roleTimeMs;
5,270,899✔
613
    stateChanged = true;
5,270,899✔
614
  }
615
  return stateChanged;
151,433,800✔
616
}
617

618
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
33,602,924✔
619
  bool stateChanged = false;
33,602,924✔
620
  bool roleChanged = pObj->syncState != pMload->syncState ||
33,608,390✔
621
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
66,785,466✔
622
                     pObj->roleTimeMs != pMload->roleTimeMs;
33,182,542✔
623
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
33,602,924✔
624
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
425,668✔
625
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
626
          pObj->syncTerm, pMload->syncTerm);
627
    pObj->syncState = pMload->syncState;
425,668✔
628
    pObj->syncTerm = pMload->syncTerm;
425,668✔
629
    pObj->syncRestore = pMload->syncRestore;
425,668✔
630
    pObj->roleTimeMs = pMload->roleTimeMs;
425,668✔
631
    stateChanged = true;
425,668✔
632
  }
633
  return stateChanged;
33,602,924✔
634
}
635

636
extern char   *tsMonFwUri;
637
extern char   *tsMonSlowLogUri;
638
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
54✔
639
  SMnode    *pMnode = pReq->info.node;
54✔
640
  SStatisReq statisReq = {0};
54✔
641
  int32_t    code = -1;
54✔
642

643
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
54✔
644

645
  if (tsMonitorLogProtocol) {
54✔
646
    mInfo("process statis req,\n %s", statisReq.pCont);
54✔
647
  }
648

649
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
54✔
650
    monSendContent(statisReq.pCont, tsMonFwUri);
54✔
651
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
652
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
653
  }
654

655
  tFreeSStatisReq(&statisReq);
54✔
656
  return 0;
54✔
657
}
658

659
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
270✔
660
  mTrace("process audit req:%p", pReq);
270✔
661
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
270✔
662
    SMnode   *pMnode = pReq->info.node;
270✔
663
    SAuditReq auditReq = {0};
270✔
664

665
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
270✔
666

667
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
270✔
668

669
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
270✔
670
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
671

672
    tFreeSAuditReq(&auditReq);
270✔
673
  }
674
  return 0;
270✔
675
}
676

677
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
678
  mTrace("process audit req:%p", pReq);
×
679
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
680
    SMnode        *pMnode = pReq->info.node;
×
681
    SBatchAuditReq auditReq = {0};
×
682

683
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
684

685
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
686

687
    for (int32_t i = 0; i < nAudit; ++i) {
×
688
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
689
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
690

691
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
692
                     audit->duration, audit->affectedRows);
693
    }
694

695
    tFreeSBatchAuditReq(&auditReq);
×
696
  }
697
  return 0;
×
698
}
699

700
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
796,155✔
701
  int32_t       code = 0, lino = 0;
796,155✔
702
  SDnodeInfoReq infoReq = {0};
796,155✔
703
  int32_t       contLen = 0;
796,155✔
704
  void         *pReq = NULL;
796,155✔
705

706
  infoReq.dnodeId = pDnode->id;
796,155✔
707
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
796,155✔
708

709
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
796,155✔
710
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
711
  }
712
  pReq = rpcMallocCont(contLen);
796,155✔
713
  if (pReq == NULL) {
796,155✔
714
    TAOS_RETURN(terrno);
×
715
  }
716

717
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
796,155✔
718
    code = contLen;
×
719
    goto _exit;
×
720
  }
721

722
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
796,155✔
723
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
796,155✔
724
_exit:
796,155✔
725
  if (code < 0) {
796,155✔
726
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
727
  }
728
  TAOS_RETURN(code);
796,155✔
729
}
730

731
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
796,155✔
732
  int32_t       code = 0, lino = 0;
796,155✔
733
  SMnode       *pMnode = pReq->info.node;
796,155✔
734
  SDnodeInfoReq infoReq = {0};
796,155✔
735
  SDnodeObj    *pDnode = NULL;
796,155✔
736
  STrans       *pTrans = NULL;
796,155✔
737
  SSdbRaw      *pCommitRaw = NULL;
796,155✔
738

739
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
796,155✔
740

741
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
796,155✔
742
  if (pDnode == NULL) {
796,155✔
UNCOV
743
    TAOS_CHECK_EXIT(terrno);
×
744
  }
745

746
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
796,155✔
747
  if (pTrans == NULL) {
796,155✔
748
    TAOS_CHECK_EXIT(terrno);
×
749
  }
750

751
  pDnode->updateTime = taosGetTimestampMs();
796,155✔
752

753
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
796,155✔
754
    TAOS_CHECK_EXIT(terrno);
×
755
  }
756
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
796,155✔
757
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
758
    TAOS_CHECK_EXIT(code);
×
759
  }
760
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
796,155✔
761
  pCommitRaw = NULL;
796,155✔
762

763
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
796,155✔
UNCOV
764
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
765
    TAOS_CHECK_EXIT(code);
×
766
  }
767

768
_exit:
796,155✔
769
  mndReleaseDnode(pMnode, pDnode);
796,155✔
770
  if (code != 0) {
796,155✔
UNCOV
771
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
772
  }
773
  mndTransDrop(pTrans);
796,155✔
774
  sdbFreeRaw(pCommitRaw);
796,155✔
775
  TAOS_RETURN(code);
796,155✔
776
}
777

778
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
52,068,249✔
779
  SMnode    *pMnode = pReq->info.node;
52,068,249✔
780
  SStatusReq statusReq = {0};
52,068,249✔
781
  SDnodeObj *pDnode = NULL;
52,068,249✔
782
  int32_t    code = -1;
52,068,249✔
783

784
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
52,068,249✔
785

786
  int64_t clusterid = mndGetClusterId(pMnode);
52,068,249✔
787
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
52,068,249✔
788
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
789
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
790
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
791
    goto _OVER;
×
792
  }
793

794
  if (statusReq.dnodeId == 0) {
52,068,249✔
795
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
992,625✔
796
    if (pDnode == NULL) {
992,625✔
797
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
414,673✔
798
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
414,673✔
799
      if (terrno != 0) code = terrno;
414,673✔
800
      goto _OVER;
414,673✔
801
    }
802
  } else {
803
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
51,075,624✔
804
    if (pDnode == NULL) {
51,075,624✔
805
      int32_t err = terrno;
180,592✔
806
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
180,592✔
807
      if (pDnode != NULL) {
180,592✔
808
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
316✔
809
        terrno = err;
316✔
810
        goto _OVER;
316✔
811
      }
812

813
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
180,276✔
814
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
180,276✔
815
        terrno = err;
31,019✔
816
        goto _OVER;
31,019✔
817
      } else {
818
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
149,257✔
819
        if (pDnode == NULL) goto _OVER;
149,257✔
820
      }
821
    }
822
  }
823

824
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
51,622,241✔
825
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
51,622,241✔
826

827
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
51,622,241✔
828
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
51,622,241✔
829
  int64_t curMs = taosGetTimestampMs();
51,622,241✔
830
  bool    online = mndIsDnodeOnline(pDnode, curMs);
51,622,241✔
831
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
51,622,241✔
832
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
51,622,241✔
833
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
51,622,241✔
834
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
51,622,241✔
835
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
51,622,241✔
836
  bool    analVerChanged = (analVer != statusReq.analVer);
51,622,241✔
837
  bool    auditDBChanged = false;
51,622,241✔
838
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
51,622,241✔
839
  bool    auditTokenChanged = false;
51,622,241✔
840
  char    auditToken[TSDB_TOKEN_LEN] = {0};
51,622,241✔
841

842
  if (tsAuditUseToken) {
51,622,241✔
843
    SDbObj *pDb = mndAcquireAuditDb(pMnode);
51,622,241✔
844
    if (pDb != NULL) {
51,622,241✔
845
      SName name = {0};
810✔
846
      if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0)
810✔
847
        mError("db:%s, failed to parse db name", pDb->name);
×
848
      tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN);
810✔
849
      mndReleaseDb(pMnode, pDb);
810✔
850
    }
851
    if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
51,622,241✔
852

853
    char    auditUser[TSDB_USER_LEN] = {0};
51,622,241✔
854
    int32_t ret = 0;
51,622,241✔
855
    if ((ret = mndGetAuditUser(pMnode, auditUser)) != 0) {
51,622,241✔
856
      mTrace("dnode:%d, failed to get audit user since %s", pDnode->id, tstrerror(ret));
51,621,431✔
857
    } else {
858
      mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser);
810✔
859
      int32_t ret = 0;
810✔
860
      if ((ret = mndGetUserActiveToken("audit", auditToken)) != 0) {
810✔
UNCOV
861
        mTrace("dnode:%d, failed to get audit user active token, token:%s, since %s", pDnode->id, auditToken,
×
862
               tstrerror(ret));
863
      } else {
864
        mTrace("dnode:%d, get audit user active token:%s", pDnode->id, auditToken);
810✔
865
        if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditTokenChanged = true;
810✔
866
      }
867
    }
868
  } 
869

870
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
51,011,744✔
871
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
49,455,147✔
872
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditTokenChanged;
102,633,985✔
873
  const STraceId *trace = &pReq->info.traceId;
51,622,241✔
874
  char            timestamp[TD_TIME_STR_LEN] = {0};
51,622,241✔
875
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
51,622,241✔
876
  mGTrace(
51,622,241✔
877
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
878
      "timestamp:%s",
879
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
880

881
  if (reboot) {
51,622,241✔
882
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
642,261✔
883
  }
884

885
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
51,622,241✔
886
  if (labs(delta) >= tsTimestampDeltaLimit) {
51,622,241✔
887
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
888
    code = terrno;
×
889

890
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
891
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
892
           tstrerror(code), tsTimestampDeltaLimit);
893
    goto _OVER;
×
894
  }
895
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
203,532,605✔
896
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
151,910,364✔
897

898
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
151,910,364✔
899
    if (pVgroup != NULL) {
151,910,364✔
900
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
151,494,050✔
901
        pVgroup->cacheUsage = pVload->cacheUsage;
120,586,651✔
902
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
120,586,651✔
903
        pVgroup->numOfTables = pVload->numOfTables;
120,586,651✔
904
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
120,586,651✔
905
        pVgroup->totalStorage = pVload->totalStorage;
120,586,651✔
906
        pVgroup->compStorage = pVload->compStorage;
120,586,651✔
907
        pVgroup->pointsWritten = pVload->pointsWritten;
120,586,651✔
908
      }
909
      bool stateChanged = false;
151,494,050✔
910
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
195,019,975✔
911
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
194,959,725✔
912
        if (pGid->dnodeId == statusReq.dnodeId) {
194,959,725✔
913
          if (pVload->startTimeMs == 0) {
151,433,800✔
914
            pVload->startTimeMs = statusReq.rebootTime;
×
915
          }
916
          if (pVload->roleTimeMs == 0) {
151,433,800✔
917
            pVload->roleTimeMs = statusReq.rebootTime;
×
918
          }
919
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
151,433,800✔
920
          break;
151,433,800✔
921
        }
922
      }
923
      if (stateChanged) {
151,494,050✔
924
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,270,899✔
925
        if (pDb != NULL && pDb->stateTs != curMs) {
5,270,899✔
926
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,638,561✔
927
                pDb->stateTs, curMs);
928
          pDb->stateTs = curMs;
3,638,561✔
929
        }
930
        mndReleaseDb(pMnode, pDb);
5,270,899✔
931
      }
932
    }
933

934
    mndReleaseVgroup(pMnode, pVgroup);
151,910,364✔
935
  }
936

937
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
51,622,241✔
938
  if (pObj != NULL) {
51,622,241✔
939
    if (statusReq.mload.roleTimeMs == 0) {
33,602,924✔
940
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
294,816✔
941
    }
942
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
33,602,924✔
943
    mndReleaseMnode(pMnode, pObj);
33,602,924✔
944
  }
945

946
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
51,622,241✔
947
  if (pQnode != NULL) {
51,622,241✔
948
    pQnode->load = statusReq.qload;
326,782✔
949
    mndReleaseQnode(pMnode, pQnode);
326,782✔
950
  }
951

952
  if (needCheck) {
51,622,241✔
953
    if (statusReq.sver != tsVersion) {
2,167,364✔
954
      if (pDnode != NULL) {
×
955
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
956
      }
957
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
958
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
959
      goto _OVER;
×
960
    }
961

962
    if (statusReq.dnodeId == 0) {
2,167,364✔
963
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
577,952✔
964
    } else {
965
      if (statusReq.clusterId != pMnode->clusterId) {
1,589,412✔
966
        if (pDnode != NULL) {
×
967
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
968
        }
969
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
970
               pMnode->clusterId);
971
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
972
        goto _OVER;
×
973
      }
974
    }
975

976
    // Verify whether the cluster parameters are consistent when status change from offline to ready
977
    // pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
978
    // if (pDnode->offlineReason != 0) {
979
    //   mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
980
    //   if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
981
    //   goto _OVER;
982
    // }
983

984
    if (!online) {
2,167,364✔
985
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
610,497✔
986
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
987
    } else {
988
      mInfo("dnode:%d, do check in status req, online:%d dnodeVer:%" PRId64 ":%" PRId64
1,556,867✔
989
            " reboot:%d, dnodeChanged:%d supportVnodesChanged:%d analVerChanged:%d encryptKeyChanged:%d "
990
            "enableWhiteListChanged:%d auditDBChanged:%d auditTokenChanged:%d pMnode->ipWhiteVer:%" PRId64
991
            " statusReq.ipWhiteVer:%" PRId64 " pMnode->timeWhiteVer:%" PRId64 " statusReq.timeWhiteVer:%" PRId64,
992
            pDnode->id, online, statusReq.dnodeVer, dnodeVer, reboot, dnodeChanged, supportVnodesChanged,
993
            analVerChanged, encryptKeyChanged, enableWhiteListChanged, auditDBChanged, auditTokenChanged,
994
            pMnode->ipWhiteVer, statusReq.ipWhiteVer, pMnode->timeWhiteVer, statusReq.timeWhiteVer);
995
    }
996

997
    pDnode->rebootTime = statusReq.rebootTime;
2,167,364✔
998
    pDnode->numOfCores = statusReq.numOfCores;
2,167,364✔
999
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,167,364✔
1000
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,167,364✔
1001
    pDnode->memAvail = statusReq.memAvail;
2,167,364✔
1002
    pDnode->memTotal = statusReq.memTotal;
2,167,364✔
1003
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,167,364✔
1004
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,167,364✔
1005
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,167,364✔
1006
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
796,155✔
1007
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
796,155✔
1008
        goto _OVER;
×
1009
      }
1010
    }
1011

1012
    SStatusRsp statusRsp = {0};
2,167,364✔
1013
    statusRsp.statusSeq++;
2,167,364✔
1014
    statusRsp.analVer = analVer;
2,167,364✔
1015
    statusRsp.dnodeVer = dnodeVer;
2,167,364✔
1016
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,167,364✔
1017
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,167,364✔
1018
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,167,364✔
1019
    if (statusRsp.pDnodeEps == NULL) {
2,167,364✔
1020
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1021
      goto _OVER;
×
1022
    }
1023

1024
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,167,364✔
1025
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,167,364✔
1026
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,167,364✔
1027

1028
    if (auditDB[0] != '\0') {
2,167,364✔
1029
      mInfo("dnode:%d, set audit db %s in process status rsp", statusReq.dnodeId, auditDB);
54✔
1030
      tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
54✔
1031
    }
1032
    if (auditToken[0] != '\0') {
2,167,364✔
1033
      mInfo("dnode:%d, set audit token %s in process status rsp", statusReq.dnodeId, auditToken);
54✔
1034
      tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN);
54✔
1035
    }
1036

1037
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,167,364✔
1038
    void   *pHead = rpcMallocCont(contLen);
2,167,364✔
1039
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,167,364✔
1040
    taosArrayDestroy(statusRsp.pDnodeEps);
2,167,364✔
1041
    if (contLen < 0) {
2,167,364✔
1042
      code = contLen;
×
1043
      goto _OVER;
×
1044
    }
1045

1046
    pReq->info.rspLen = contLen;
2,167,364✔
1047
    pReq->info.rsp = pHead;
2,167,364✔
1048
  }
1049

1050
  pDnode->accessTimes++;
51,622,241✔
1051
  pDnode->lastAccessTime = curMs;
51,622,241✔
1052
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
51,622,241✔
1053
    pDnode->offlineReason = DND_REASON_ONLINE;
610,497✔
1054
  }
1055
  code = 0;
51,622,241✔
1056

1057
_OVER:
52,068,249✔
1058
  mndReleaseDnode(pMnode, pDnode);
52,068,249✔
1059
  taosArrayDestroy(statusReq.pVloads);
52,068,249✔
1060
  if (code != 0) {
52,068,249✔
1061
    mError("dnode:%d, failed to process status req since %s", statusReq.dnodeId, tstrerror(code));
414,673✔
1062
    return code;
414,673✔
1063
  }
1064

1065
  return mndUpdClusterInfo(pReq);
51,653,576✔
1066
}
1067

1068
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
1069
  SMnode    *pMnode = pReq->info.node;
×
1070
  SNotifyReq notifyReq = {0};
×
1071
  int32_t    code = 0;
×
1072

1073
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
1074
    terrno = code;
×
1075
    goto _OVER;
×
1076
  }
1077

1078
  int64_t clusterid = mndGetClusterId(pMnode);
×
1079
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
1080
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
1081
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1082
          notifyReq.clusterId, clusterid, tstrerror(code));
1083
    goto _OVER;
×
1084
  }
1085

1086
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
1087
  for (int32_t v = 0; v < nVgroup; ++v) {
×
1088
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1089

1090
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
1091
    if (pVgroup != NULL) {
×
1092
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
1093
      mndReleaseVgroup(pMnode, pVgroup);
×
1094
    }
1095
  }
1096
  code = mndUpdClusterInfo(pReq);
×
1097
_OVER:
×
1098
  tFreeSNotifyReq(&notifyReq);
×
1099
  return code;
×
1100
}
1101

1102
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
149,134✔
1103
  int32_t  code = -1;
149,134✔
1104
  SSdbRaw *pRaw = NULL;
149,134✔
1105
  STrans  *pTrans = NULL;
149,134✔
1106

1107
  SDnodeObj dnodeObj = {0};
149,134✔
1108
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
149,134✔
1109
  dnodeObj.createdTime = taosGetTimestampMs();
149,134✔
1110
  dnodeObj.updateTime = dnodeObj.createdTime;
149,134✔
1111
  dnodeObj.port = pCreate->port;
149,134✔
1112
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
149,134✔
1113
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
149,134✔
1114

1115
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
149,134✔
1116
  if (pTrans == NULL) {
149,134✔
1117
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1118
    if (terrno != 0) code = terrno;
×
1119
    goto _OVER;
×
1120
  }
1121
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
149,134✔
1122
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
149,134✔
1123

1124
  pRaw = mndDnodeActionEncode(&dnodeObj);
149,134✔
1125
  if (pRaw == NULL) {
149,134✔
1126
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1127
    if (terrno != 0) code = terrno;
×
1128
    goto _OVER;
×
1129
  }
1130
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
149,134✔
1131
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
149,134✔
1132
  pRaw = NULL;
149,134✔
1133

1134
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
149,134✔
1135
  code = 0;
149,134✔
1136

1137
_OVER:
149,134✔
1138
  mndTransDrop(pTrans);
149,134✔
1139
  sdbFreeRaw(pRaw);
149,134✔
1140
  return code;
149,134✔
1141
}
1142

1143
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
25,555✔
1144
  SMnode       *pMnode = pReq->info.node;
25,555✔
1145
  SSdb         *pSdb = pMnode->pSdb;
25,555✔
1146
  SDnodeObj    *pObj = NULL;
25,555✔
1147
  void         *pIter = NULL;
25,555✔
1148
  SDnodeListRsp rsp = {0};
25,555✔
1149
  int32_t       code = -1;
25,555✔
1150

1151
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
25,555✔
1152
  if (NULL == rsp.dnodeList) {
25,555✔
1153
    mError("failed to alloc epSet while process dnode list req");
×
1154
    code = terrno;
×
1155
    goto _OVER;
×
1156
  }
1157

1158
  while (1) {
47,332✔
1159
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
72,887✔
1160
    if (pIter == NULL) break;
72,887✔
1161

1162
    SDNodeAddr dnodeAddr = {0};
47,332✔
1163
    dnodeAddr.nodeId = pObj->id;
47,332✔
1164
    dnodeAddr.epSet.numOfEps = 1;
47,332✔
1165
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
47,332✔
1166
    dnodeAddr.epSet.eps[0].port = pObj->port;
47,332✔
1167

1168
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
94,664✔
1169
      if (terrno != 0) code = terrno;
×
1170
      sdbRelease(pSdb, pObj);
×
1171
      sdbCancelFetch(pSdb, pIter);
×
1172
      goto _OVER;
×
1173
    }
1174

1175
    sdbRelease(pSdb, pObj);
47,332✔
1176
  }
1177

1178
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
25,555✔
1179
  void   *pRsp = rpcMallocCont(rspLen);
25,555✔
1180
  if (pRsp == NULL) {
25,555✔
1181
    code = terrno;
×
1182
    goto _OVER;
×
1183
  }
1184

1185
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
25,555✔
1186
    code = rspLen;
×
1187
    goto _OVER;
×
1188
  }
1189

1190
  pReq->info.rspLen = rspLen;
25,555✔
1191
  pReq->info.rsp = pRsp;
25,555✔
1192
  code = 0;
25,555✔
1193

1194
_OVER:
25,555✔
1195

1196
  if (code != 0) {
25,555✔
1197
    mError("failed to get dnode list since %s", tstrerror(code));
×
1198
  }
1199

1200
  tFreeSDnodeListRsp(&rsp);
25,555✔
1201

1202
  TAOS_RETURN(code);
25,555✔
1203
}
1204

1205
void getSlowLogScopeString(int32_t scope, char *result) {
1,092✔
1206
  if (scope == SLOW_LOG_TYPE_NULL) {
1,092✔
1207
    (void)strncat(result, "NONE", 64);
×
1208
    return;
×
1209
  }
1210
  while (scope > 0) {
2,184✔
1211
    if (scope & SLOW_LOG_TYPE_QUERY) {
1,092✔
1212
      (void)strncat(result, "QUERY", 64);
1,092✔
1213
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,092✔
1214
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1215
      (void)strncat(result, "INSERT", 64);
×
1216
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1217
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1218
      (void)strncat(result, "OTHERS", 64);
×
1219
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1220
    } else {
1221
      (void)printf("invalid slow log scope:%d", scope);
×
1222
      return;
×
1223
    }
1224

1225
    if (scope > 0) {
1,092✔
1226
      (void)strncat(result, "|", 64);
×
1227
    }
1228
  }
1229
}
1230

1231
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
149,134✔
1232
  SMnode         *pMnode = pReq->info.node;
149,134✔
1233
  int32_t         code = -1;
149,134✔
1234
  SDnodeObj      *pDnode = NULL;
149,134✔
1235
  SCreateDnodeReq createReq = {0};
149,134✔
1236
  int32_t         lino = 0;
149,134✔
1237
  int64_t         tss = taosGetTimestampMs();
149,134✔
1238

1239
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
149,134✔
1240
    goto _OVER;
×
1241
  }
1242

1243
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
149,134✔
1244
  TAOS_CHECK_GOTO(code, &lino, _OVER);
149,134✔
1245

1246
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
149,134✔
1247
  code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_DNODE);
149,134✔
1248
  TAOS_CHECK_GOTO(code, &lino, _OVER);
149,134✔
1249

1250
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
149,134✔
1251
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1252
    goto _OVER;
×
1253
  }
1254
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1255
  // if (code != 0) {
1256
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1257
  //          tstrerror(code));
1258
  //   goto _OVER;
1259
  // }
1260

1261
  char ep[TSDB_EP_LEN];
149,134✔
1262
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
149,134✔
1263
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
149,134✔
1264
  if (pDnode != NULL) {
149,134✔
1265
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1266
    goto _OVER;
×
1267
  }
1268

1269
  code = mndCreateDnode(pMnode, pReq, &createReq);
149,134✔
1270
  if (code == 0) {
149,134✔
1271
    code = TSDB_CODE_ACTION_IN_PROGRESS;
149,134✔
1272
    tsGrantHBInterval = 5;
149,134✔
1273
  }
1274

1275
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
149,134✔
1276
    char obj[200] = {0};
149,134✔
1277
    (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
149,134✔
1278

1279
    int64_t tse = taosGetTimestampMs();
149,134✔
1280
    double  duration = (double)(tse - tss);
149,134✔
1281
    duration = duration / 1000;
149,134✔
1282
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
149,134✔
1283
  }
1284

1285
_OVER:
149,134✔
1286
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
149,134✔
1287
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1288
  }
1289

1290
  mndReleaseDnode(pMnode, pDnode);
149,134✔
1291
  tFreeSCreateDnodeReq(&createReq);
149,134✔
1292
  TAOS_RETURN(code);
149,134✔
1293
}
1294

1295
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1296

1297
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
2,142✔
1298

1299
#ifndef TD_ENTERPRISE
1300
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1301
#endif
1302

1303
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
9,208✔
1304
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1305
  int32_t  code = -1;
9,208✔
1306
  SSdbRaw *pRaw = NULL;
9,208✔
1307
  STrans  *pTrans = NULL;
9,208✔
1308
  int32_t  lino = 0;
9,208✔
1309

1310
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
9,208✔
1311
  if (pTrans == NULL) {
9,208✔
1312
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1313
    if (terrno != 0) code = terrno;
×
1314
    goto _OVER;
×
1315
  }
1316
  mndTransSetGroupParallel(pTrans);
9,208✔
1317
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
9,208✔
1318
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
9,208✔
1319

1320
  pRaw = mndDnodeActionEncode(pDnode);
9,208✔
1321
  if (pRaw == NULL) {
9,208✔
1322
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1323
    if (terrno != 0) code = terrno;
×
1324
    goto _OVER;
×
1325
  }
1326
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
9,208✔
1327
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
9,208✔
1328
  pRaw = NULL;
9,208✔
1329

1330
  pRaw = mndDnodeActionEncode(pDnode);
9,208✔
1331
  if (pRaw == NULL) {
9,208✔
1332
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1333
    if (terrno != 0) code = terrno;
×
1334
    goto _OVER;
×
1335
  }
1336
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
9,208✔
1337
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
9,208✔
1338
  pRaw = NULL;
9,208✔
1339

1340
  if (pSObj != NULL) {
9,208✔
1341
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
386✔
1342
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
386✔
1343
  }
1344

1345
  if (pMObj != NULL) {
9,208✔
1346
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
220✔
1347
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
220✔
1348
  }
1349

1350
  if (pQObj != NULL) {
9,208✔
1351
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
140✔
1352
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
140✔
1353
  }
1354

1355
  if (pBObj != NULL) {
9,208✔
1356
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
750✔
1357
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
750✔
1358
  }
1359

1360
  if (numOfVnodes > 0) {
8,458✔
1361
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
7,042✔
1362
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
7,042✔
1363
  }
1364

1365
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
8,458✔
1366

1367
  code = 0;
8,458✔
1368

1369
_OVER:
9,208✔
1370
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
9,208✔
1371
  mndTransDrop(pTrans);
9,208✔
1372
  sdbFreeRaw(pRaw);
9,208✔
1373
  TAOS_RETURN(code);
9,208✔
1374
}
1375

1376
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1377
  bool       isEmpty = false;
×
1378
  SMnodeObj *pMObj = NULL;
×
1379
  SQnodeObj *pQObj = NULL;
×
1380
  SSnodeObj *pSObj = NULL;
×
1381

1382
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1383
  if (pQObj) goto _OVER;
×
1384

1385
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1386
  if (pSObj) goto _OVER;
×
1387

1388
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1389
  if (pMObj) goto _OVER;
×
1390

1391
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1392
  if (numOfVnodes > 0) goto _OVER;
×
1393

1394
  isEmpty = true;
×
1395
_OVER:
×
1396
  mndReleaseMnode(pMnode, pMObj);
×
1397
  mndReleaseQnode(pMnode, pQObj);
×
1398
  mndReleaseSnode(pMnode, pSObj);
×
1399
  return isEmpty;
×
1400
}
1401

1402
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
9,818✔
1403
  SMnode       *pMnode = pReq->info.node;
9,818✔
1404
  int32_t       code = -1;
9,818✔
1405
  SDnodeObj    *pDnode = NULL;
9,818✔
1406
  SMnodeObj    *pMObj = NULL;
9,818✔
1407
  SQnodeObj    *pQObj = NULL;
9,818✔
1408
  SSnodeObj    *pSObj = NULL;
9,818✔
1409
  SBnodeObj    *pBObj = NULL;
9,818✔
1410
  SDropDnodeReq dropReq = {0};
9,818✔
1411
  int64_t       tss = taosGetTimestampMs();
9,818✔
1412

1413
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
9,818✔
1414

1415
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
9,818✔
1416
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1417
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MNODE), NULL, _OVER);
9,818✔
1418

1419
  bool force = dropReq.force;
9,818✔
1420
  if (dropReq.unsafe) {
9,818✔
1421
    force = true;
×
1422
  }
1423

1424
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
9,818✔
1425
  if (pDnode == NULL) {
9,818✔
1426
    int32_t err = terrno;
×
1427
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1428
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1429
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1430
    if (pDnode == NULL) {
×
1431
      code = err;
×
1432
      goto _OVER;
×
1433
    }
1434
  }
1435

1436
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
9,818✔
1437
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
9,818✔
1438
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
9,818✔
1439
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
9,818✔
1440
  if (pMObj != NULL) {
9,818✔
1441
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
830✔
1442
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
390✔
1443
      goto _OVER;
390✔
1444
    }
1445
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
440✔
1446
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
220✔
1447
      goto _OVER;
220✔
1448
    }
1449
  }
1450

1451
#ifdef USE_MOUNT
1452
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
9,208✔
1453
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1454
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1455
    goto _OVER;
×
1456
  }
1457
#endif
1458

1459
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
9,208✔
1460
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
9,208✔
1461

1462
  if (isonline && force) {
9,208✔
1463
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1464
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1465
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1466
    goto _OVER;
×
1467
  }
1468

1469
  mError("vnode num:%d", numOfVnodes);
9,208✔
1470

1471
  bool    vnodeOffline = false;
9,208✔
1472
  void   *pIter = NULL;
9,208✔
1473
  int32_t vgId = -1;
9,208✔
1474
  while (1) {
20,985✔
1475
    SVgObj *pVgroup = NULL;
30,193✔
1476
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
30,193✔
1477
    if (pIter == NULL) break;
30,193✔
1478

1479
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
63,460✔
1480
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
42,475✔
1481
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
42,475✔
1482
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
14,078✔
1483
          vgId = pVgroup->vgId;
×
1484
          vnodeOffline = true;
×
1485
          break;
×
1486
        }
1487
      }
1488
    }
1489

1490
    sdbRelease(pMnode->pSdb, pVgroup);
20,985✔
1491

1492
    if (vnodeOffline) {
20,985✔
1493
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1494
      break;
×
1495
    }
1496
  }
1497

1498
  if (vnodeOffline && !force) {
9,208✔
1499
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1500
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1501
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1502
    goto _OVER;
×
1503
  }
1504

1505
  if (!isonline && !force) {
9,208✔
1506
    code = TSDB_CODE_DNODE_OFFLINE;
×
1507
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
×
1508
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1509
    goto _OVER;
×
1510
  }
1511

1512
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
9,208✔
1513
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
9,208✔
1514

1515
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
9,208✔
1516
    char obj1[30] = {0};
9,208✔
1517
    (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
9,208✔
1518

1519
    int64_t tse = taosGetTimestampMs();
9,208✔
1520
    double  duration = (double)(tse - tss);
9,208✔
1521
    duration = duration / 1000;
9,208✔
1522
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
9,208✔
1523
  }
1524

1525
_OVER:
9,818✔
1526
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
9,818✔
1527
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
1,360✔
1528
  }
1529

1530
  mndReleaseDnode(pMnode, pDnode);
9,818✔
1531
  mndReleaseMnode(pMnode, pMObj);
9,818✔
1532
  mndReleaseQnode(pMnode, pQObj);
9,818✔
1533
  mndReleaseBnode(pMnode, pBObj);
9,818✔
1534
  mndReleaseSnode(pMnode, pSObj);
9,818✔
1535
  tFreeSDropDnodeReq(&dropReq);
9,818✔
1536
  TAOS_RETURN(code);
9,818✔
1537
}
1538

1539
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
163✔
1540
  int32_t code = 0;
163✔
1541
  SMnode *pMnode = pReq->info.node;
163✔
1542
  SSdb   *pSdb = pMnode->pSdb;
163✔
1543
  void   *pIter = NULL;
163✔
1544
  int8_t  encrypting = 0;
163✔
1545

1546
  const STraceId *trace = &pReq->info.traceId;
163✔
1547

1548
  int32_t klen = strlen(pDcfgReq->value);
163✔
1549
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
163✔
1550
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1551
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1552
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1553
    goto _exit;
×
1554
  }
1555

1556
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
163✔
1557
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1558
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1559
    goto _exit;
×
1560
  }
1561

1562
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
163✔
1563
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1564
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1565
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1566
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1567
    goto _exit;
×
1568
  }
1569

1570
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
163✔
1571
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
163✔
1572
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
163✔
1573

1574
  while (1) {
163✔
1575
    SDnodeObj *pDnode = NULL;
326✔
1576
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
326✔
1577
    if (pIter == NULL) break;
326✔
1578
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
163✔
1579
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1580
             offlineReason[pDnode->offlineReason]);
1581
      sdbRelease(pSdb, pDnode);
×
1582
      continue;
×
1583
    }
1584

1585
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
163✔
1586
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
163✔
1587
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
163✔
1588
      void   *pBuf = rpcMallocCont(bufLen);
163✔
1589

1590
      if (pBuf != NULL) {
163✔
1591
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
163✔
1592
          code = bufLen;
×
1593
          sdbRelease(pSdb, pDnode);
×
1594
          goto _exit;
×
1595
        }
1596
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
163✔
1597
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
163✔
1598
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
163✔
1599
        }
1600
      }
1601
    }
1602

1603
    sdbRelease(pSdb, pDnode);
163✔
1604
  }
1605

1606
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
163✔
1607
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1608
  }
1609

1610
_exit:
163✔
1611
  if (code != 0) {
163✔
1612
    if (terrno == 0) terrno = code;
×
1613
  }
1614
  return code;
163✔
1615
}
1616

1617
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
163✔
1618
  int32_t code = 0;
163✔
1619

1620
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1621
  SMnode       *pMnode = pReq->info.node;
163✔
1622
  SMCfgDnodeReq cfgReq = {0};
163✔
1623
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
163✔
1624

1625
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE)) != 0) {
163✔
1626
    tFreeSMCfgDnodeReq(&cfgReq);
×
1627
    TAOS_RETURN(code);
×
1628
  }
1629
  const STraceId *trace = &pReq->info.traceId;
163✔
1630
  SDCfgDnodeReq   dcfgReq = {0};
163✔
1631
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
163✔
1632
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
163✔
1633
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
163✔
1634
    tFreeSMCfgDnodeReq(&cfgReq);
163✔
1635
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
163✔
1636
  } else {
1637
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
1638
    tFreeSMCfgDnodeReq(&cfgReq);
×
1639
    TAOS_RETURN(code);
×
1640
  }
1641

1642
#else
1643
  TAOS_RETURN(code);
1644
#endif
1645
}
1646

1647
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
163✔
1648
  SMnode *pMnode = pRsp->info.node;
163✔
1649
  int16_t nSuccess = 0;
163✔
1650
  int16_t nFailed = 0;
163✔
1651

1652
  if (0 == pRsp->code) {
163✔
1653
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
163✔
1654
  } else {
1655
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1656
  }
1657

1658
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
163✔
1659
  bool    finished = nSuccess + nFailed >= nReq;
163✔
1660

1661
  if (finished) {
163✔
1662
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
163✔
1663
  }
1664

1665
  const STraceId *trace = &pRsp->info.traceId;
163✔
1666
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
163✔
1667
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1668

1669
  return 0;
163✔
1670
}
1671

1672
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,092✔
1673
  SMnode *pMnode = pReq->info.node;
1,092✔
1674
  int32_t totalRows = 0;
1,092✔
1675
  int32_t numOfRows = 0;
1,092✔
1676
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,092✔
1677
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,092✔
1678
  char   *pWrite = NULL;
1,092✔
1679
  int32_t cols = 0;
1,092✔
1680
  int32_t code = 0;
1,092✔
1681
  int32_t lino = 0;
1,092✔
1682

1683
  cfgOpts[totalRows] = "statusIntervalMs";
1,092✔
1684
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
1,092✔
1685
  totalRows++;
1,092✔
1686

1687
  cfgOpts[totalRows] = "timezone";
1,092✔
1688
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,092✔
1689
  totalRows++;
1,092✔
1690

1691
  cfgOpts[totalRows] = "locale";
1,092✔
1692
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,092✔
1693
  totalRows++;
1,092✔
1694

1695
  cfgOpts[totalRows] = "charset";
1,092✔
1696
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,092✔
1697
  totalRows++;
1,092✔
1698

1699
  cfgOpts[totalRows] = "monitor";
1,092✔
1700
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,092✔
1701
  totalRows++;
1,092✔
1702

1703
  cfgOpts[totalRows] = "monitorInterval";
1,092✔
1704
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,092✔
1705
  totalRows++;
1,092✔
1706

1707
  cfgOpts[totalRows] = "slowLogThreshold";
1,092✔
1708
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,092✔
1709
  totalRows++;
1,092✔
1710

1711
  cfgOpts[totalRows] = "slowLogMaxLen";
1,092✔
1712
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,092✔
1713
  totalRows++;
1,092✔
1714

1715
  char scopeStr[64] = {0};
1,092✔
1716
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,092✔
1717
  cfgOpts[totalRows] = "slowLogScope";
1,092✔
1718
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,092✔
1719
  totalRows++;
1,092✔
1720

1721
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,092✔
1722
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,092✔
1723

1724
  for (int32_t i = 0; i < totalRows; i++) {
10,920✔
1725
    cols = 0;
9,828✔
1726

1727
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
9,828✔
1728
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,828✔
1729
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
9,828✔
1730

1731
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
9,828✔
1732
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,828✔
1733
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
9,828✔
1734

1735
    numOfRows++;
9,828✔
1736
  }
1737

1738
_OVER:
1,092✔
1739
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,092✔
1740
  pShow->numOfRows += numOfRows;
1,092✔
1741
  return numOfRows;
1,092✔
1742
}
1743

1744
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1745

1746
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
714,106✔
1747
  SMnode    *pMnode = pReq->info.node;
714,106✔
1748
  SSdb      *pSdb = pMnode->pSdb;
714,106✔
1749
  int32_t    numOfRows = 0;
714,106✔
1750
  int32_t    cols = 0;
714,106✔
1751
  ESdbStatus objStatus = 0;
714,106✔
1752
  SDnodeObj *pDnode = NULL;
714,106✔
1753
  int64_t    curMs = taosGetTimestampMs();
714,106✔
1754
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
713,947✔
1755
  int32_t    code = 0;
714,106✔
1756
  int32_t    lino = 0;
714,106✔
1757

1758
  while (numOfRows < rows) {
2,552,543✔
1759
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
2,552,543✔
1760
    if (pShow->pIter == NULL) break;
2,552,543✔
1761
    bool online = mndIsDnodeOnline(pDnode, curMs);
1,838,437✔
1762

1763
    cols = 0;
1,838,437✔
1764

1765
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,838,437✔
1766
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
1,838,437✔
1767

1768
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
1,838,437✔
1769

1770
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,838,437✔
1771
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,838,437✔
1772

1773
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,838,437✔
1774
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
1,838,437✔
1775
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
1,838,437✔
1776

1777
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,838,437✔
1778
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
1,838,437✔
1779
                        &lino, _OVER);
1780

1781
    const char *status = "ready";
1,838,437✔
1782
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
1,838,437✔
1783
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
1,838,437✔
1784
    if (!online) {
1,838,437✔
1785
      if (objStatus == SDB_STATUS_CREATING)
198,076✔
1786
        status = "creating*";
×
1787
      else if (objStatus == SDB_STATUS_DROPPING)
198,076✔
1788
        status = "dropping*";
×
1789
      else
1790
        status = "offline";
198,076✔
1791
    }
1792

1793
    STR_TO_VARSTR(buf, status);
1,838,437✔
1794
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,838,437✔
1795
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,838,437✔
1796

1797
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,838,437✔
1798
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
1,838,437✔
1799
                        _OVER);
1800

1801
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,838,437✔
1802
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
1,838,437✔
1803
                        _OVER);
1804

1805
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
1,838,437✔
1806
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
1,838,437✔
1807

1808
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,838,437✔
1809
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
1,838,437✔
1810
    taosMemoryFreeClear(b);
1,838,437✔
1811

1812
#ifdef TD_ENTERPRISE
1813
    STR_TO_VARSTR(buf, pDnode->machineId);
1,838,437✔
1814
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,838,437✔
1815
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,838,437✔
1816
#endif
1817

1818
    numOfRows++;
1,838,437✔
1819
    sdbRelease(pSdb, pDnode);
1,838,437✔
1820
  }
1821

1822
_OVER:
713,947✔
1823
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
714,106✔
1824

1825
  pShow->numOfRows += numOfRows;
714,106✔
1826
  return numOfRows;
714,106✔
1827
}
1828

1829
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1830
  SSdb *pSdb = pMnode->pSdb;
×
1831
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1832
}
×
1833

1834
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1835
  int32_t    code = 0;
×
1836
  SDnodeObj *pObj = NULL;
×
1837
  void      *pIter = NULL;
×
1838
  SSdb      *pSdb = pMnode->pSdb;
×
1839
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1840
  if (fqdns == NULL) {
×
1841
    mError("failed to init fqdns array");
×
1842
    return NULL;
×
1843
  }
1844

1845
  while (1) {
×
1846
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1847
    if (pIter == NULL) break;
×
1848

1849
    char *fqdn = taosStrdup(pObj->fqdn);
×
1850
    if (fqdn == NULL) {
×
1851
      sdbRelease(pSdb, pObj);
×
1852
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
1853

1854
      code = terrno;
×
1855
      break;
×
1856
    }
1857

1858
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1859
      mError("failed to fqdn into array, but continue at this time");
×
1860
    }
1861
    sdbRelease(pSdb, pObj);
×
1862
  }
1863

1864
_error:
×
1865
  if (code != 0) {
×
1866
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
1867
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
1868
      taosMemoryFreeClear(pFqdn);
×
1869
    }
1870
    taosArrayDestroy(fqdns);
×
1871
    fqdns = NULL;
×
1872
  }
1873

1874
  return fqdns;
×
1875
}
1876

1877
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq) {
552,740✔
1878
  SMnode     *pMnode = pReq->info.node;
552,740✔
1879
  SKeySyncReq req = {0};
552,740✔
1880
  SKeySyncRsp rsp = {0};
552,740✔
1881
  int32_t     code = TSDB_CODE_SUCCESS;
552,740✔
1882

1883
  code = tDeserializeSKeySyncReq(pReq->pCont, pReq->contLen, &req);
552,740✔
1884
  if (code != 0) {
552,740✔
1885
    mError("failed to deserialize key sync req, since %s", tstrerror(code));
×
1886
    goto _OVER;
×
1887
  }
1888

1889
  mInfo("received key sync req from dnode:%d, keyVersion:%d", req.dnodeId, req.keyVersion);
552,740✔
1890

1891
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1892
  // Load mnode's encryption keys
1893
  char masterKeyFile[PATH_MAX] = {0};
552,740✔
1894
  snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
552,740✔
1895
           TD_DIRSEP);
1896
  char derivedKeyFile[PATH_MAX] = {0};
552,740✔
1897
  snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
552,740✔
1898
           TD_DIRSEP);
1899
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
552,740✔
1900
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
552,740✔
1901
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
552,740✔
1902
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
552,740✔
1903
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
552,740✔
1904
  int32_t algorithm = 0;
552,740✔
1905
  int32_t cfgAlgorithm = 0;
552,740✔
1906
  int32_t metaAlgorithm = 0;
552,740✔
1907
  int32_t fileVersion = 0;
552,740✔
1908
  int32_t keyVersion = 0;
552,740✔
1909
  int64_t createTime = 0;
552,740✔
1910
  int64_t svrKeyUpdateTime = 0;
552,740✔
1911
  int64_t dbKeyUpdateTime = 0;
552,740✔
1912

1913
  if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
552,740✔
1914
    keyVersion = tsEncryptKeyVersion;
1,442✔
1915
    tstrncpy(svrKey, tsSvrKey, ENCRYPT_KEY_LEN + 1);
1,442✔
1916
    tstrncpy(dbKey, tsDbKey, ENCRYPT_KEY_LEN + 1);
1,442✔
1917
    tstrncpy(cfgKey, tsCfgKey, ENCRYPT_KEY_LEN + 1);
1,442✔
1918
    tstrncpy(metaKey, tsMetaKey, ENCRYPT_KEY_LEN + 1);
1,442✔
1919
    tstrncpy(dataKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
1,442✔
1920
    algorithm = tsEncryptAlgorithmType;
1,442✔
1921
    cfgAlgorithm = tsCfgAlgorithm;
1,442✔
1922
    metaAlgorithm = tsMetaAlgorithm;
1,442✔
1923
    fileVersion = tsEncryptFileVersion;
1,442✔
1924
    createTime = tsEncryptKeyCreateTime;
1,442✔
1925
    svrKeyUpdateTime = tsSvrKeyUpdateTime;
1,442✔
1926
    dbKeyUpdateTime = tsDbKeyUpdateTime;
1,442✔
1927
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_LOADED;
1,442✔
1928
  } else {
1929
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_DISABLED;
551,298✔
1930
  }
1931

1932
  // Check if dnode needs update
1933
  if (req.keyVersion != keyVersion) {
552,740✔
1934
    mInfo("dnode:%d key version mismatch, mnode:%d, dnode:%d, will send keys", req.dnodeId, keyVersion, req.keyVersion);
1,442✔
1935

1936
    rsp.keyVersion = keyVersion;
1,442✔
1937
    rsp.needUpdate = 1;
1,442✔
1938
    tstrncpy(rsp.svrKey, svrKey, sizeof(rsp.svrKey));
1,442✔
1939
    tstrncpy(rsp.dbKey, dbKey, sizeof(rsp.dbKey));
1,442✔
1940
    tstrncpy(rsp.cfgKey, cfgKey, sizeof(rsp.cfgKey));
1,442✔
1941
    tstrncpy(rsp.metaKey, metaKey, sizeof(rsp.metaKey));
1,442✔
1942
    tstrncpy(rsp.dataKey, dataKey, sizeof(rsp.dataKey));
1,442✔
1943
    rsp.algorithm = algorithm;
1,442✔
1944
    rsp.createTime = createTime;
1,442✔
1945
    rsp.svrKeyUpdateTime = svrKeyUpdateTime;
1,442✔
1946
    rsp.dbKeyUpdateTime = dbKeyUpdateTime;
1,442✔
1947
  } else {
1948
    mInfo("dnode:%d key version matches, version:%d", req.dnodeId, keyVersion);
551,298✔
1949
    rsp.keyVersion = keyVersion;
551,298✔
1950
    rsp.needUpdate = 0;
551,298✔
1951
  }
1952
#else
1953
  // Community edition - no encryption support
1954
  mWarn("enterprise features not enabled, key sync not supported");
1955
  rsp.keyVersion = 0;
1956
  rsp.needUpdate = 0;
1957
#endif
1958

1959
  int32_t contLen = tSerializeSKeySyncRsp(NULL, 0, &rsp);
552,740✔
1960
  if (contLen < 0) {
552,740✔
1961
    code = contLen;
×
1962
    goto _OVER;
×
1963
  }
1964

1965
  void *pHead = rpcMallocCont(contLen);
552,740✔
1966
  if (pHead == NULL) {
552,740✔
1967
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1968
    goto _OVER;
×
1969
  }
1970

1971
  contLen = tSerializeSKeySyncRsp(pHead, contLen, &rsp);
552,740✔
1972
  if (contLen < 0) {
552,740✔
1973
    rpcFreeCont(pHead);
×
1974
    code = contLen;
×
1975
    goto _OVER;
×
1976
  }
1977

1978
  pReq->info.rspLen = contLen;
552,740✔
1979
  pReq->info.rsp = pHead;
552,740✔
1980

1981
_OVER:
552,740✔
1982
  if (code != 0) {
552,740✔
1983
    mError("failed to process key sync req, since %s", tstrerror(code));
×
1984
  }
1985
  return code;
552,740✔
1986
}
1987

1988
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq) { return 0; }
×
1989

1990
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
1991
  if (p == NULL) return NULL;
×
1992

1993
  switch (type) {
×
1994
    case SDB_DNODE:
×
1995
      return (SDnodeObj *)p;
×
1996
    case SDB_QNODE:
×
1997
      return ((SQnodeObj *)p)->pDnode;
×
1998
    case SDB_SNODE:
×
1999
      return ((SSnodeObj *)p)->pDnode;
×
2000
    case SDB_BNODE:
×
2001
      return ((SBnodeObj *)p)->pDnode;
×
2002
    default:
×
2003
      break;
×
2004
  }
2005
  return NULL;
×
2006
}
2007
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
2008
  int32_t lino = 0;
×
2009
  SSdb   *pSdb = pMnode->pSdb;
×
2010
  void   *pIter = NULL;
×
2011
  int32_t code = 0;
×
2012

2013
  while (1) {
×
2014
    void *pObj = NULL;
×
2015
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
2016
    if (pIter == NULL) break;
×
2017

2018
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
2019
    if (pDnodeObj == NULL) {
×
2020
      mError("null dnode object for type:%d", type);
×
2021
      sdbRelease(pSdb, pObj);
×
2022
      continue;
×
2023
    }
2024

2025
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
2026
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
2027
      mError("failed to push addr into array");
×
2028
      sdbRelease(pSdb, pObj);
×
2029
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2030
    }
2031
    sdbRelease(pSdb, pObj);
×
2032
  }
2033

2034
_exit:
×
2035
  return code;
×
2036
}
2037

2038
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
2039
  int32_t lino = 0;
×
2040
  int32_t code = 0;
×
2041
  if (pMnode == NULL || pAddr == NULL) {
×
2042
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
2043
  }
2044

2045
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
2046
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2047

2048
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
2049
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2050

2051
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
2052
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2053

2054
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
2055
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2056

2057
_error:
×
2058
  return code;
×
2059
}
2060

2061
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
2062
  int32_t code = 0;
×
2063

2064
  SMnode *pMnode = pReq->info.node;
×
2065
  void   *pIter = NULL;
×
2066
  SSdb   *pSdb = pMnode->pSdb;
×
2067
  mInfo("start to reload dnode tls config");
×
2068

2069
  SMCfgDnodeReq req = {0};
×
2070
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
2071
    goto _OVER;
×
2072
  }
2073

2074
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
2075
    goto _OVER;
×
2076
  }
2077

2078
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
2079
  if (pAddr == NULL) {
×
2080
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
2081
  }
2082

2083
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
2084

2085
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
2086
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
2087
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
2088
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
2089
    if (code != 0) {
×
2090
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
2091
    }
2092
  }
2093

2094
_OVER:
×
2095
  tFreeSMCfgDnodeReq(&req);
×
2096
  taosArrayDestroy(pAddr);
×
2097
  return code;
×
2098
}
2099

2100
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
2101
  int32_t code = 0;
×
2102
  if (pRsp->code != 0) {
×
2103
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
2104
  } else {
2105
    mInfo("succeed to reload dnode tls config");
×
2106
  }
2107
  return code;
×
2108
}
2109

2110
static int32_t mndProcessAlterEncryptKeyReqImpl(SRpcMsg *pReq, SMAlterEncryptKeyReq *pAlterReq) {
×
2111
  int32_t code = 0;
×
2112
  SMnode *pMnode = pReq->info.node;
×
2113
  SSdb   *pSdb = pMnode->pSdb;
×
2114
  void   *pIter = NULL;
×
2115

2116
  const STraceId *trace = &pReq->info.traceId;
×
2117

2118
  // Validate key type
2119
  if (pAlterReq->keyType != 0 && pAlterReq->keyType != 1) {
×
2120
    mGError("msg:%p, failed to alter encrypt key since invalid key type:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", pReq,
×
2121
            pAlterReq->keyType);
2122
    return TSDB_CODE_INVALID_PARA;
×
2123
  }
2124

2125
  // Validate new key length
2126
  int32_t klen = strlen(pAlterReq->newKey);
×
2127
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
2128
    mGError("msg:%p, failed to alter encrypt key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
2129
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
2130
    return TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
2131
  }
2132

2133
  // Prepare SMAlterEncryptKeyReq for distribution to dnodes
2134
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
2135
  alterKeyReq.keyType = pAlterReq->keyType;
×
2136
  tstrncpy(alterKeyReq.newKey, pAlterReq->newKey, sizeof(alterKeyReq.newKey));
×
2137
  alterKeyReq.sqlLen = 0;
×
2138
  alterKeyReq.sql = NULL;
×
2139

2140
  // Send request to all online dnodes
2141
  while (1) {
×
2142
    SDnodeObj *pDnode = NULL;
×
2143
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2144
    if (pIter == NULL) break;
×
2145

2146
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
2147
      mGWarn("msg:%p, don't send alter encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2148
             offlineReason[pDnode->offlineReason]);
2149
      sdbRelease(pSdb, pDnode);
×
2150
      continue;
×
2151
    }
2152

2153
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2154
    int32_t bufLen = tSerializeSMAlterEncryptKeyReq(NULL, 0, &alterKeyReq);
×
2155
    void   *pBuf = rpcMallocCont(bufLen);
×
2156

2157
    if (pBuf != NULL) {
×
2158
      if ((bufLen = tSerializeSMAlterEncryptKeyReq(pBuf, bufLen, &alterKeyReq)) <= 0) {
×
2159
        code = bufLen;
×
2160
        sdbRelease(pSdb, pDnode);
×
2161
        goto _exit;
×
2162
      }
2163
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
2164
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2165
      if (ret != 0) {
×
2166
        mGError("msg:%p, failed to send alter encrypt_key req to dnode:%d, error:%s", pReq, pDnode->id, tstrerror(ret));
×
2167
      } else {
2168
        mGInfo("msg:%p, send alter encrypt_key req to dnode:%d, keyType:%d", pReq, pDnode->id, pAlterReq->keyType);
×
2169
      }
2170
    }
2171

2172
    sdbRelease(pSdb, pDnode);
×
2173
  }
2174

2175
  // Note: mnode runs on dnode, so the local keys will be updated by dnode itself
2176
  // when it receives the alter encrypt key request from mnode
2177
  mGInfo("msg:%p, successfully sent alter encrypt key request to all dnodes, keyType:%d", pReq, pAlterReq->keyType);
×
2178

2179
_exit:
×
2180
  if (code != 0) {
×
2181
    if (terrno == 0) terrno = code;
×
2182
  }
2183
  return code;
×
2184
}
2185

2186
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq) {
×
2187
  SMnode              *pMnode = pReq->info.node;
×
2188
  SMAlterEncryptKeyReq alterReq = {0};
×
2189
  int32_t              code = TSDB_CODE_SUCCESS;
×
2190
  int32_t              lino = 0;
×
2191

2192
  // Check privilege - only admin can alter encryption keys
2193
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2194
                  &lino, _OVER);
2195

2196
  // Deserialize request
2197
  code = tDeserializeSMAlterEncryptKeyReq(pReq->pCont, pReq->contLen, &alterReq);
×
2198
  if (code != 0) {
×
2199
    mError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
2200
    goto _OVER;
×
2201
  }
2202

2203
  mInfo("received alter encrypt key req, keyType:%d", alterReq.keyType);
×
2204

2205
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2206
  // Process and distribute to all dnodes
2207
  code = mndProcessAlterEncryptKeyReqImpl(pReq, &alterReq);
×
2208
  if (code == 0) {
×
2209
    // Audit log
2210
    auditRecord(pReq, pMnode->clusterId, "alterEncryptKey", "", alterReq.keyType == 0 ? "SVR_KEY" : "DB_KEY",
×
2211
                alterReq.sql, alterReq.sqlLen, 0, 0);
2212
  }
2213
#else
2214
  // Community edition - no encryption support
2215
  mError("encryption key management is only available in enterprise edition");
2216
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2217
#endif
2218

2219
_OVER:
×
2220
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2221
    mError("failed to alter encrypt key, keyType:%d, since %s", alterReq.keyType, tstrerror(code));
×
2222
  }
2223

2224
  tFreeSMAlterEncryptKeyReq(&alterReq);
×
2225
  TAOS_RETURN(code);
×
2226
}
2227

NEW
2228
static int32_t mndProcessAlterKeyExpirationReqImpl(SRpcMsg *pReq, SMAlterKeyExpirationReq *pAlterReq) {
×
NEW
2229
  int32_t code = 0;
×
NEW
2230
  SMnode *pMnode = pReq->info.node;
×
NEW
2231
  SSdb   *pSdb = pMnode->pSdb;
×
NEW
2232
  void   *pIter = NULL;
×
2233

NEW
2234
  const STraceId *trace = &pReq->info.traceId;
×
2235

2236
  // Validate days value
NEW
2237
  if (pAlterReq->days < 0) {
×
NEW
2238
    mGError("msg:%p, failed to alter key expiration since invalid days:%d, must be >= 0", pReq, pAlterReq->days);
×
NEW
2239
    return TSDB_CODE_INVALID_PARA;
×
2240
  }
2241

2242
  // Validate strategy
NEW
2243
  if (strlen(pAlterReq->strategy) == 0) {
×
NEW
2244
    mGError("msg:%p, failed to alter key expiration since empty strategy", pReq);
×
NEW
2245
    return TSDB_CODE_INVALID_PARA;
×
2246
  }
2247

2248
  // Prepare SMAlterKeyExpirationReq for distribution to dnodes
NEW
2249
  SMAlterKeyExpirationReq alterReq = {0};
×
NEW
2250
  alterReq.days = pAlterReq->days;
×
NEW
2251
  tstrncpy(alterReq.strategy, pAlterReq->strategy, sizeof(alterReq.strategy));
×
NEW
2252
  alterReq.sqlLen = 0;
×
NEW
2253
  alterReq.sql = NULL;
×
2254

2255
  // Send request to all online dnodes
NEW
2256
  while (1) {
×
NEW
2257
    SDnodeObj *pDnode = NULL;
×
NEW
2258
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
NEW
2259
    if (pIter == NULL) break;
×
2260

NEW
2261
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
NEW
2262
      mGWarn("msg:%p, don't send alter key_expiration req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2263
             offlineReason[pDnode->offlineReason]);
NEW
2264
      sdbRelease(pSdb, pDnode);
×
NEW
2265
      continue;
×
2266
    }
2267

NEW
2268
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
NEW
2269
    int32_t bufLen = tSerializeSMAlterKeyExpirationReq(NULL, 0, &alterReq);
×
NEW
2270
    void   *pBuf = rpcMallocCont(bufLen);
×
2271

NEW
2272
    if (pBuf != NULL) {
×
NEW
2273
      if ((bufLen = tSerializeSMAlterKeyExpirationReq(pBuf, bufLen, &alterReq)) <= 0) {
×
NEW
2274
        code = bufLen;
×
NEW
2275
        sdbRelease(pSdb, pDnode);
×
NEW
2276
        goto _exit;
×
2277
      }
NEW
2278
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_KEY_EXPIRATION, .pCont = pBuf, .contLen = bufLen};
×
NEW
2279
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
NEW
2280
      if (ret != 0) {
×
NEW
2281
        mGError("msg:%p, failed to send alter key_expiration req to dnode:%d, error:%s", pReq, pDnode->id,
×
2282
                tstrerror(ret));
2283
      } else {
NEW
2284
        mGInfo("msg:%p, send alter key_expiration req to dnode:%d, days:%d, strategy:%s", pReq, pDnode->id,
×
2285
               pAlterReq->days, pAlterReq->strategy);
2286
      }
2287
    }
2288

NEW
2289
    sdbRelease(pSdb, pDnode);
×
2290
  }
2291

NEW
2292
  mGInfo("msg:%p, successfully sent alter key expiration request to all dnodes, days:%d, strategy:%s", pReq,
×
2293
         pAlterReq->days, pAlterReq->strategy);
2294

NEW
2295
_exit:
×
NEW
2296
  if (code != 0) {
×
NEW
2297
    if (terrno == 0) terrno = code;
×
2298
  }
NEW
2299
  return code;
×
2300
}
2301

NEW
2302
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq) {
×
NEW
2303
  SMnode                 *pMnode = pReq->info.node;
×
NEW
2304
  SMAlterKeyExpirationReq alterReq = {0};
×
NEW
2305
  int32_t                 code = TSDB_CODE_SUCCESS;
×
NEW
2306
  int32_t                 lino = 0;
×
2307

2308
  // Check privilege - only admin can alter key expiration
NEW
2309
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2310
                  &lino, _OVER);
2311

2312
  // Deserialize request
NEW
2313
  code = tDeserializeSMAlterKeyExpirationReq(pReq->pCont, pReq->contLen, &alterReq);
×
NEW
2314
  if (code != 0) {
×
NEW
2315
    mError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
NEW
2316
    goto _OVER;
×
2317
  }
2318

NEW
2319
  mInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
2320

2321
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2322
  // Process and distribute to all dnodes
NEW
2323
  code = mndProcessAlterKeyExpirationReqImpl(pReq, &alterReq);
×
NEW
2324
  if (code == 0) {
×
2325
    // Audit log
NEW
2326
    char detail[128];
×
NEW
2327
    snprintf(detail, sizeof(detail), "%d DAYS %s", alterReq.days, alterReq.strategy);
×
NEW
2328
    auditRecord(pReq, pMnode->clusterId, "alterKeyExpiration", "", detail, alterReq.sql, alterReq.sqlLen, 0, 0);
×
2329
  }
2330
#else
2331
  // Community edition - no encryption support
2332
  mError("key expiration management is only available in enterprise edition");
2333
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2334
#endif
2335

NEW
2336
_OVER:
×
NEW
2337
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
2338
    mError("failed to alter key expiration, days:%d, strategy:%s, since %s", alterReq.days, alterReq.strategy,
×
2339
           tstrerror(code));
2340
  }
2341

NEW
2342
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
NEW
2343
  TAOS_RETURN(code);
×
2344
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc