• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4887

16 Dec 2025 08:27AM UTC coverage: 65.289% (-0.003%) from 65.292%
#4887

push

travis-ci

web-flow
feat[TS-7233]: audit (#33850)

377 of 536 new or added lines in 28 files covered. (70.34%)

1025 existing lines in 111 files now uncovered.

178977 of 274129 relevant lines covered (65.29%)

102580217.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.44
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "taos_monitor.h"
33
#include "tconfig.h"
34
#include "tjson.h"
35
#include "tmisce.h"
36
#include "tunit.h"
37

38
#define TSDB_DNODE_VER_NUMBER   2
39
#define TSDB_DNODE_RESERVE_SIZE 40
40

41
static const char *offlineReason[] = {
42
    "",
43
    "status msg timeout",
44
    "status not received",
45
    "version not match",
46
    "dnodeId not match",
47
    "clusterId not match",
48
    "statusInterval not match",
49
    "timezone not match",
50
    "locale not match",
51
    "charset not match",
52
    "ttlChangeOnWrite not match",
53
    "enableWhiteList not match",
54
    "encryptionKey not match",
55
    "monitor not match",
56
    "monitor switch not match",
57
    "monitor interval not match",
58
    "monitor slow log threshold not match",
59
    "monitor slow log sql max len not match",
60
    "monitor slow log scope not match",
61
    "unknown",
62
};
63

64
enum {
65
  DND_ACTIVE_CODE,
66
  DND_CONN_ACTIVE_CODE,
67
};
68

69
enum {
70
  DND_CREATE,
71
  DND_ADD,
72
  DND_DROP,
73
};
74

75
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
76
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
77
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
78
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
79
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
80
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
81
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
82

83
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
91
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
92
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
93
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
94

95
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
96
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
97
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
98
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
99

100
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
101
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
102

103
#ifdef _GRANT
104
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
105
#else
106
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
107
#endif
108

109
int32_t mndInitDnode(SMnode *pMnode) {
516,595✔
110
  SSdbTable table = {
516,595✔
111
      .sdbType = SDB_DNODE,
112
      .keyType = SDB_KEY_INT32,
113
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
114
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
115
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
116
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
117
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
118
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
119
  };
120

121
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
516,595✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
516,595✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
516,595✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
516,595✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
516,595✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
516,595✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
516,595✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
516,595✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
516,595✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
516,595✔
131
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
516,595✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
516,595✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
516,595✔
134
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
516,595✔
135

136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
516,595✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
516,595✔
138
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
516,595✔
139
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
516,595✔
140

141
  return sdbSetTable(pMnode->pSdb, table);
516,595✔
142
}
143

144
void mndCleanupDnode(SMnode *pMnode) {}
516,478✔
145

146
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
334,320✔
147
  int32_t  code = -1;
334,320✔
148
  SSdbRaw *pRaw = NULL;
334,320✔
149
  STrans  *pTrans = NULL;
334,320✔
150

151
  SDnodeObj dnodeObj = {0};
334,320✔
152
  dnodeObj.id = 1;
334,320✔
153
  dnodeObj.createdTime = taosGetTimestampMs();
334,320✔
154
  dnodeObj.updateTime = dnodeObj.createdTime;
334,320✔
155
  dnodeObj.port = tsServerPort;
334,320✔
156
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
334,320✔
157
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
334,320✔
158
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
334,320✔
159
  char *machineId = NULL;
334,320✔
160
  code = tGetMachineId(&machineId);
334,320✔
161
  if (machineId) {
334,320✔
162
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
334,320✔
163
    taosMemoryFreeClear(machineId);
334,320✔
164
  } else {
165
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
166
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
167
    goto _OVER;
×
168
#endif
169
  }
170

171
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
334,320✔
172
  if (pTrans == NULL) {
334,320✔
173
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
174
    if (terrno != 0) code = terrno;
×
175
    goto _OVER;
×
176
  }
177
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
334,320✔
178

179
  pRaw = mndDnodeActionEncode(&dnodeObj);
334,320✔
180
  if (pRaw == NULL) {
334,320✔
181
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
182
    if (terrno != 0) code = terrno;
×
183
    goto _OVER;
×
184
  }
185
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
334,320✔
186
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
334,320✔
187
  pRaw = NULL;
334,320✔
188

189
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
334,320✔
190
  code = 0;
334,320✔
191

192
_OVER:
334,320✔
193
  mndTransDrop(pTrans);
334,320✔
194
  sdbFreeRaw(pRaw);
334,320✔
195
  return code;
334,320✔
196
}
197

198
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
3,050,501✔
199
  int32_t code = 0;
3,050,501✔
200
  int32_t lino = 0;
3,050,501✔
201
  terrno = TSDB_CODE_OUT_OF_MEMORY;
3,050,501✔
202

203
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
3,050,501✔
204
  if (pRaw == NULL) goto _OVER;
3,050,501✔
205

206
  int32_t dataPos = 0;
3,050,501✔
207
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
3,050,501✔
208
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
3,050,501✔
209
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
3,050,501✔
210
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
3,050,501✔
211
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
3,050,501✔
212
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
3,050,501✔
213
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
3,050,501✔
214
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
3,050,501✔
215
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
3,050,501✔
216
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
3,050,501✔
217

218
  terrno = 0;
3,050,501✔
219

220
_OVER:
3,050,501✔
221
  if (terrno != 0) {
3,050,501✔
222
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
223
    sdbFreeRaw(pRaw);
×
224
    return NULL;
×
225
  }
226

227
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
3,050,501✔
228
  return pRaw;
3,050,501✔
229
}
230

231
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
2,294,313✔
232
  int32_t code = 0;
2,294,313✔
233
  int32_t lino = 0;
2,294,313✔
234
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,294,313✔
235
  SSdbRow   *pRow = NULL;
2,294,313✔
236
  SDnodeObj *pDnode = NULL;
2,294,313✔
237

238
  int8_t sver = 0;
2,294,313✔
239
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
2,294,313✔
240
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
2,294,313✔
241
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
242
    goto _OVER;
×
243
  }
244

245
  pRow = sdbAllocRow(sizeof(SDnodeObj));
2,294,313✔
246
  if (pRow == NULL) goto _OVER;
2,294,313✔
247

248
  pDnode = sdbGetRowObj(pRow);
2,294,313✔
249
  if (pDnode == NULL) goto _OVER;
2,294,313✔
250

251
  int32_t dataPos = 0;
2,294,313✔
252
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
2,294,313✔
253
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
2,294,313✔
254
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
2,294,313✔
255
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
2,294,313✔
256
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,294,313✔
257
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,294,313✔
258
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,294,313✔
259
  if (sver > 1) {
2,294,313✔
260
    int16_t keyLen = 0;
2,294,313✔
261
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,294,313✔
262
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,294,313✔
263
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,294,313✔
264
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,294,313✔
265
  }
266

267
  terrno = 0;
2,294,313✔
268
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
2,294,313✔
269
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
270
  }
271

272
_OVER:
2,294,313✔
273
  if (terrno != 0) {
2,294,313✔
274
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
275
    taosMemoryFreeClear(pRow);
×
276
    return NULL;
×
277
  }
278

279
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
2,294,313✔
280
  return pRow;
2,294,313✔
281
}
282

283
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
1,144,595✔
284
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
1,144,595✔
285
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
1,144,595✔
286

287
  char ep[TSDB_EP_LEN] = {0};
1,144,595✔
288
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
1,144,595✔
289
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
1,144,595✔
290
  return 0;
1,144,595✔
291
}
292

293
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
2,294,230✔
294
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
2,294,230✔
295
  return 0;
2,294,230✔
296
}
297

298
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
1,136,794✔
299
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
1,136,794✔
300
  pOld->updateTime = pNew->updateTime;
1,136,794✔
301
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
302
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
1,136,794✔
303
#endif
304
  return 0;
1,136,794✔
305
}
306

307
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
160,909,734✔
308
  SSdb      *pSdb = pMnode->pSdb;
160,909,734✔
309
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
160,909,734✔
310
  if (pDnode == NULL) {
160,910,503✔
311
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
339,079✔
312
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
96,050✔
313
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
243,029✔
314
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
315
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
243,029✔
316
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
243,029✔
317
    } else {
318
      terrno = TSDB_CODE_APP_ERROR;
×
319
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
320
    }
321
  }
322

323
  return pDnode;
160,910,503✔
324
}
325

326
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
162,586,811✔
327
  SSdb *pSdb = pMnode->pSdb;
162,586,811✔
328
  sdbRelease(pSdb, pDnode);
162,586,811✔
329
}
162,587,439✔
330

331
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
10,433,236✔
332
  SEpSet epSet = {0};
10,433,236✔
333
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
10,433,236✔
334
  return epSet;
10,433,236✔
335
}
336

337
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
1,184,235✔
338
  SEpSet     epSet = {0};
1,184,235✔
339
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,184,235✔
340
  if (!pDnode) return epSet;
1,184,235✔
341

342
  epSet = mndGetDnodeEpset(pDnode);
1,184,235✔
343

344
  mndReleaseDnode(pMnode, pDnode);
1,184,235✔
345
  return epSet;
1,184,235✔
346
}
347

348
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,751,771✔
349
  SSdb *pSdb = pMnode->pSdb;
1,751,771✔
350

351
  void *pIter = NULL;
1,751,771✔
352
  while (1) {
3,407,122✔
353
    SDnodeObj *pDnode = NULL;
5,158,893✔
354
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
5,158,893✔
355
    if (pIter == NULL) break;
5,158,893✔
356

357
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
4,141,722✔
358
      sdbCancelFetch(pSdb, pIter);
734,600✔
359
      return pDnode;
734,600✔
360
    }
361

362
    sdbRelease(pSdb, pDnode);
3,407,122✔
363
  }
364

365
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
1,017,171✔
366
  return NULL;
1,017,171✔
367
}
368

369
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
187,504✔
370
  SSdb *pSdb = pMnode->pSdb;
187,504✔
371

372
  void *pIter = NULL;
187,504✔
373
  while (1) {
203,595✔
374
    SDnodeObj *pDnode = NULL;
391,099✔
375
    ESdbStatus objStatus = 0;
391,099✔
376
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
391,099✔
377
    if (pIter == NULL) break;
391,099✔
378

379
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
391,099✔
380
      sdbCancelFetch(pSdb, pIter);
187,504✔
381
      return pDnode;
187,504✔
382
    }
383

384
    sdbRelease(pSdb, pDnode);
203,595✔
385
  }
386

387
  return NULL;
×
388
}
389

390
int32_t mndGetDnodeSize(SMnode *pMnode) {
71,486,718✔
391
  SSdb *pSdb = pMnode->pSdb;
71,486,718✔
392
  return sdbGetSize(pSdb, SDB_DNODE);
71,486,718✔
393
}
394

395
int32_t mndGetDbSize(SMnode *pMnode) {
×
396
  SSdb *pSdb = pMnode->pSdb;
×
397
  return sdbGetSize(pSdb, SDB_DB);
×
398
}
399

400
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
91,765,359✔
401
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
91,765,359✔
402
  if (interval > (int64_t)tsStatusTimeoutMs) {
91,765,010✔
403
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,857,886✔
404
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
63,554✔
405
    }
406
    return false;
2,857,886✔
407
  }
408
  return true;
88,907,124✔
409
}
410

411
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,760,927✔
412
  SSdb *pSdb = pMnode->pSdb;
2,760,927✔
413

414
  int32_t numOfEps = 0;
2,760,927✔
415
  void   *pIter = NULL;
2,760,927✔
416
  while (1) {
10,264,481✔
417
    SDnodeObj *pDnode = NULL;
13,025,408✔
418
    ESdbStatus objStatus = 0;
13,025,408✔
419
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
13,025,408✔
420
    if (pIter == NULL) break;
13,025,408✔
421

422
    SDnodeEp dnodeEp = {0};
10,264,481✔
423
    dnodeEp.id = pDnode->id;
10,264,481✔
424
    dnodeEp.ep.port = pDnode->port;
10,264,481✔
425
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
10,264,481✔
426
    sdbRelease(pSdb, pDnode);
10,264,481✔
427

428
    dnodeEp.isMnode = 0;
10,264,481✔
429
    if (mndIsMnode(pMnode, pDnode->id)) {
10,264,481✔
430
      dnodeEp.isMnode = 1;
4,125,462✔
431
    }
432
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
10,264,481✔
433
      mError("failed to put ep into array, but continue at this call");
×
434
    }
435
  }
436
}
2,760,927✔
437

438
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
27,803,304✔
439
  SSdb   *pSdb = pMnode->pSdb;
27,803,304✔
440
  int32_t code = 0;
27,803,304✔
441

442
  int32_t numOfEps = 0;
27,803,304✔
443
  void   *pIter = NULL;
27,803,304✔
444
  while (1) {
124,597,395✔
445
    SDnodeObj *pDnode = NULL;
152,400,699✔
446
    ESdbStatus objStatus = 0;
152,400,699✔
447
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
152,400,699✔
448
    if (pIter == NULL) break;
152,400,699✔
449

450
    SDnodeInfo dInfo;
124,597,050✔
451
    dInfo.id = pDnode->id;
124,597,395✔
452
    dInfo.ep.port = pDnode->port;
124,597,395✔
453
    dInfo.offlineReason = pDnode->offlineReason;
124,597,395✔
454
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
124,597,395✔
455
    sdbRelease(pSdb, pDnode);
124,597,395✔
456
    if (mndIsMnode(pMnode, pDnode->id)) {
124,597,395✔
457
      dInfo.isMnode = 1;
38,875,166✔
458
    } else {
459
      dInfo.isMnode = 0;
85,722,229✔
460
    }
461

462
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
124,597,395✔
463
      code = terrno;
×
464
      sdbCancelFetch(pSdb, pIter);
×
465
      break;
×
466
    }
467
  }
468
  TAOS_RETURN(code);
27,803,304✔
469
}
470

471
#define CHECK_MONITOR_PARA(para, err)                                                                    \
472
  if (pCfg->monitorParas.para != para) {                                                                 \
473
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
474
    terrno = err;                                                                                        \
475
    return err;                                                                                          \
476
  }
477

478
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
2,767,911✔
479
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
2,767,911✔
480
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
2,767,911✔
481
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
2,767,911✔
482
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
2,767,911✔
483
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
2,767,911✔
484

485
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
2,767,911✔
486
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
487
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
488
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
489
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
490
  }
491

492
  /*
493
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
494
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
495
           tsStatusIntervalMs);
496
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
497
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
498
  }
499
  */
500

501
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
2,767,911✔
502
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
6,984✔
503
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
504
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
6,984✔
505
    return DND_REASON_TIME_ZONE_NOT_MATCH;
6,984✔
506
  }
507

508
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
2,760,927✔
509
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
510
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
511
    return DND_REASON_LOCALE_NOT_MATCH;
×
512
  }
513

514
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
2,760,927✔
515
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
516
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
517
    return DND_REASON_CHARSET_NOT_MATCH;
×
518
  }
519

520
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
2,760,927✔
521
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
522
           tsTtlChangeOnWrite);
523
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
524
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
525
  }
526
  int8_t enable = tsEnableWhiteList ? 1 : 0;
2,760,927✔
527
  if (pCfg->enableWhiteList != enable) {
2,760,927✔
528
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
529
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
530
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
531
  }
532

533
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
2,760,927✔
534
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
2,760,927✔
535
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
536
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
537
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
538
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
539
  }
540

541
  return DND_REASON_ONLINE;
2,760,927✔
542
}
543

544
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
74,614✔
545
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
74,614✔
546
    return 0.0;
96✔
547
  }
548

549
  int64_t deltaCount = currentCount - lastCount;
74,518✔
550
  int64_t deltaMs = currentTimeMs - lastTimeMs;
74,518✔
551
  double  rate = (double)deltaCount / (double)deltaMs;
74,518✔
552
  return rate;
74,518✔
553
}
554

555
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
104,251,717✔
556
  bool stateChanged = false;
104,251,717✔
557
  bool roleChanged = pGid->syncState != pVload->syncState ||
104,265,609✔
558
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
203,213,285✔
559
                     pGid->roleTimeMs != pVload->roleTimeMs;
98,961,568✔
560

561
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
104,251,717✔
562
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
155,629✔
563
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
75,507✔
564
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
80,122✔
565
      int64_t currentTimeMs = taosGetTimestampMs();
74,614✔
566
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
74,614✔
567
                                          pGid->lastSyncAppliedIndexUpdateTime);
568

569
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
74,614✔
570
    }
571
  }
572

573
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
104,251,717✔
574
  pGid->syncCommitIndex = pVload->syncCommitIndex;
104,251,717✔
575
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
104,251,717✔
576
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
104,251,717✔
577
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
104,251,717✔
578
      pGid->startTimeMs != pVload->startTimeMs) {
98,442,762✔
579
    mInfo(
5,808,955✔
580
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
581
        "canRead:%d, dnode:%d",
582
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
583
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
584
    pGid->syncState = pVload->syncState;
5,808,955✔
585
    pGid->syncTerm = pVload->syncTerm;
5,808,955✔
586
    pGid->syncRestore = pVload->syncRestore;
5,808,955✔
587
    pGid->syncCanRead = pVload->syncCanRead;
5,808,955✔
588
    pGid->startTimeMs = pVload->startTimeMs;
5,808,955✔
589
    pGid->roleTimeMs = pVload->roleTimeMs;
5,808,955✔
590
    stateChanged = true;
5,808,955✔
591
  }
592
  return stateChanged;
104,251,717✔
593
}
594

595
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
23,629,066✔
596
  bool stateChanged = false;
23,629,066✔
597
  bool roleChanged = pObj->syncState != pMload->syncState ||
23,636,543✔
598
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
46,685,283✔
599
                     pObj->roleTimeMs != pMload->roleTimeMs;
23,056,217✔
600
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
23,629,066✔
601
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
591,070✔
602
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
603
          pObj->syncTerm, pMload->syncTerm);
604
    pObj->syncState = pMload->syncState;
591,070✔
605
    pObj->syncTerm = pMload->syncTerm;
591,070✔
606
    pObj->syncRestore = pMload->syncRestore;
591,070✔
607
    pObj->roleTimeMs = pMload->roleTimeMs;
591,070✔
608
    stateChanged = true;
591,070✔
609
  }
610
  return stateChanged;
23,629,066✔
611
}
612

613
extern char   *tsMonFwUri;
614
extern char   *tsMonSlowLogUri;
615
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
146✔
616
  SMnode    *pMnode = pReq->info.node;
146✔
617
  SStatisReq statisReq = {0};
146✔
618
  int32_t    code = -1;
146✔
619

620
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
146✔
621

622
  if (tsMonitorLogProtocol) {
146✔
623
    mInfo("process statis req,\n %s", statisReq.pCont);
146✔
624
  }
625

626
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
146✔
627
    monSendContent(statisReq.pCont, tsMonFwUri);
146✔
628
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
629
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
630
  }
631

632
  tFreeSStatisReq(&statisReq);
146✔
633
  return 0;
146✔
634
}
635

636
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
435✔
637
  mTrace("process audit req:%p", pReq);
435✔
638
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
435✔
639
    SMnode   *pMnode = pReq->info.node;
435✔
640
    SAuditReq auditReq = {0};
435✔
641

642
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
435✔
643

644
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
435✔
645

646
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
435✔
647
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
648

649
    tFreeSAuditReq(&auditReq);
435✔
650
  }
651
  return 0;
435✔
652
}
653

NEW
654
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
NEW
655
  mTrace("process audit req:%p", pReq);
×
NEW
656
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
NEW
657
    SMnode        *pMnode = pReq->info.node;
×
NEW
658
    SBatchAuditReq auditReq = {0};
×
659

NEW
660
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
661

NEW
662
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
663

NEW
664
    for (int32_t i = 0; i < nAudit; ++i) {
×
NEW
665
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
NEW
666
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
667

NEW
668
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
669
                     audit->duration, audit->affectedRows);
670
    }
671

NEW
672
    tFreeSBatchAuditReq(&auditReq);
×
673
  }
NEW
674
  return 0;
×
675
}
676

677
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
802,682✔
678
  int32_t       code = 0, lino = 0;
802,682✔
679
  SDnodeInfoReq infoReq = {0};
802,682✔
680
  int32_t       contLen = 0;
802,682✔
681
  void         *pReq = NULL;
802,682✔
682

683
  infoReq.dnodeId = pDnode->id;
802,682✔
684
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
802,682✔
685

686
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
802,682✔
687
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
688
  }
689
  pReq = rpcMallocCont(contLen);
802,682✔
690
  if (pReq == NULL) {
802,682✔
691
    TAOS_RETURN(terrno);
×
692
  }
693

694
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
802,682✔
695
    code = contLen;
×
696
    goto _exit;
×
697
  }
698

699
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
802,682✔
700
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
802,682✔
701
_exit:
802,682✔
702
  if (code < 0) {
802,682✔
703
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
704
  }
705
  TAOS_RETURN(code);
802,682✔
706
}
707

708
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
802,682✔
709
  int32_t       code = 0, lino = 0;
802,682✔
710
  SMnode       *pMnode = pReq->info.node;
802,682✔
711
  SDnodeInfoReq infoReq = {0};
802,682✔
712
  SDnodeObj    *pDnode = NULL;
802,682✔
713
  STrans       *pTrans = NULL;
802,682✔
714
  SSdbRaw      *pCommitRaw = NULL;
802,682✔
715

716
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
802,682✔
717

718
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
802,682✔
719
  if (pDnode == NULL) {
802,682✔
720
    TAOS_CHECK_EXIT(terrno);
×
721
  }
722

723
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
802,682✔
724
  if (pTrans == NULL) {
802,682✔
725
    TAOS_CHECK_EXIT(terrno);
×
726
  }
727

728
  pDnode->updateTime = taosGetTimestampMs();
802,682✔
729

730
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
802,682✔
731
    TAOS_CHECK_EXIT(terrno);
×
732
  }
733
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
802,682✔
734
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
735
    TAOS_CHECK_EXIT(code);
×
736
  }
737
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
802,682✔
738
  pCommitRaw = NULL;
802,682✔
739

740
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
802,682✔
741
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
742
    TAOS_CHECK_EXIT(code);
×
743
  }
744

745
_exit:
802,682✔
746
  mndReleaseDnode(pMnode, pDnode);
802,682✔
747
  if (code != 0) {
802,682✔
748
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
749
  }
750
  mndTransDrop(pTrans);
802,682✔
751
  sdbFreeRaw(pCommitRaw);
802,682✔
752
  TAOS_RETURN(code);
802,682✔
753
}
754

755
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
41,903,313✔
756
  SMnode    *pMnode = pReq->info.node;
41,903,313✔
757
  SStatusReq statusReq = {0};
41,903,313✔
758
  SDnodeObj *pDnode = NULL;
41,903,313✔
759
  int32_t    code = -1;
41,903,313✔
760

761
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
41,903,313✔
762

763
  int64_t clusterid = mndGetClusterId(pMnode);
41,903,313✔
764
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
41,903,313✔
765
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
766
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
767
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
768
    goto _OVER;
×
769
  }
770

771
  if (statusReq.dnodeId == 0) {
41,903,313✔
772
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
1,279,523✔
773
    if (pDnode == NULL) {
1,279,523✔
774
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
546,669✔
775
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
546,669✔
776
      if (terrno != 0) code = terrno;
546,669✔
777
      goto _OVER;
546,669✔
778
    }
779
  } else {
780
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
40,623,790✔
781
    if (pDnode == NULL) {
40,623,790✔
782
      int32_t err = terrno;
269,600✔
783
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
269,600✔
784
      if (pDnode != NULL) {
269,600✔
785
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
1,746✔
786
        terrno = err;
1,746✔
787
        goto _OVER;
1,746✔
788
      }
789

790
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
267,854✔
791
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
267,854✔
792
        terrno = err;
80,350✔
793
        goto _OVER;
80,350✔
794
      } else {
795
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
187,504✔
796
        if (pDnode == NULL) goto _OVER;
187,504✔
797
      }
798
    }
799
  }
800

801
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
41,274,548✔
802
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
41,274,548✔
803

804
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
41,274,548✔
805
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
41,274,548✔
806
  int64_t curMs = taosGetTimestampMs();
41,274,548✔
807
  bool    online = mndIsDnodeOnline(pDnode, curMs);
41,274,548✔
808
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
41,274,548✔
809
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
41,274,548✔
810
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
41,274,548✔
811
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
41,274,548✔
812
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
41,274,548✔
813
  bool    analVerChanged = (analVer != statusReq.analVer);
41,274,548✔
814
  bool    auditDBChanged = false;
41,274,548✔
815
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
41,274,548✔
816
  SDbObj *pDb = mndAcquireAuditDb(pMnode);
41,274,548✔
817
  if (pDb != NULL) {
41,274,548✔
818
    tstrncpy(auditDB, pDb->name, TSDB_DB_FNAME_LEN);
725✔
819
    mndReleaseDb(pMnode, pDb);
725✔
820
  }
821

822
  if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
41,274,548✔
823

824
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
40,451,329✔
825
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
38,507,411✔
826
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged;
81,725,877✔
827
  const STraceId *trace = &pReq->info.traceId;
41,274,548✔
828
  char            timestamp[TD_TIME_STR_LEN] = {0};
41,274,548✔
829
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
41,274,548✔
830
  mGTrace(
41,274,548✔
831
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
832
      "timestamp:%s",
833
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
834

835
  if (reboot) {
41,274,548✔
836
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
863,334✔
837
  }
838

839
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
41,274,548✔
840
  if (labs(delta) >= tsTimestampDeltaLimit) {
41,274,548✔
841
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
842
    code = terrno;
×
843

844
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
845
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
846
           tstrerror(code), tsTimestampDeltaLimit);
847
    goto _OVER;
×
848
  }
849
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
146,618,728✔
850
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
105,344,180✔
851

852
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
105,344,180✔
853
    if (pVgroup != NULL) {
105,344,180✔
854
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
104,313,098✔
855
        pVgroup->cacheUsage = pVload->cacheUsage;
72,774,927✔
856
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
72,774,927✔
857
        pVgroup->numOfTables = pVload->numOfTables;
72,774,927✔
858
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
72,774,927✔
859
        pVgroup->totalStorage = pVload->totalStorage;
72,774,927✔
860
        pVgroup->compStorage = pVload->compStorage;
72,774,927✔
861
        pVgroup->pointsWritten = pVload->pointsWritten;
72,774,927✔
862
      }
863
      bool stateChanged = false;
104,313,098✔
864
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
148,552,919✔
865
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
148,491,538✔
866
        if (pGid->dnodeId == statusReq.dnodeId) {
148,491,538✔
867
          if (pVload->startTimeMs == 0) {
104,251,717✔
868
            pVload->startTimeMs = statusReq.rebootTime;
×
869
          }
870
          if (pVload->roleTimeMs == 0) {
104,251,717✔
871
            pVload->roleTimeMs = statusReq.rebootTime;
×
872
          }
873
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
104,251,717✔
874
          break;
104,251,717✔
875
        }
876
      }
877
      if (stateChanged) {
104,313,098✔
878
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,808,955✔
879
        if (pDb != NULL && pDb->stateTs != curMs) {
5,808,955✔
880
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,976,786✔
881
                pDb->stateTs, curMs);
882
          pDb->stateTs = curMs;
3,976,786✔
883
        }
884
        mndReleaseDb(pMnode, pDb);
5,808,955✔
885
      }
886
    }
887

888
    mndReleaseVgroup(pMnode, pVgroup);
105,344,180✔
889
  }
890

891
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
41,274,548✔
892
  if (pObj != NULL) {
41,274,548✔
893
    if (statusReq.mload.roleTimeMs == 0) {
23,629,066✔
894
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
271,274✔
895
    }
896
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
23,629,066✔
897
    mndReleaseMnode(pMnode, pObj);
23,629,066✔
898
  }
899

900
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
41,274,548✔
901
  if (pQnode != NULL) {
41,274,548✔
902
    pQnode->load = statusReq.qload;
273,006✔
903
    mndReleaseQnode(pMnode, pQnode);
273,006✔
904
  }
905

906
  if (needCheck) {
41,274,548✔
907
    if (statusReq.sver != tsVersion) {
2,767,911✔
908
      if (pDnode != NULL) {
×
909
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
910
      }
911
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
912
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
913
      goto _OVER;
×
914
    }
915

916
    if (statusReq.dnodeId == 0) {
2,767,911✔
917
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
732,854✔
918
    } else {
919
      if (statusReq.clusterId != pMnode->clusterId) {
2,035,057✔
920
        if (pDnode != NULL) {
×
921
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
922
        }
923
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
924
               pMnode->clusterId);
925
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
926
        goto _OVER;
×
927
      }
928
    }
929

930
    // Verify whether the cluster parameters are consistent when status change from offline to ready
931
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
2,767,911✔
932
    if (pDnode->offlineReason != 0) {
2,767,911✔
933
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
6,984✔
934
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
6,984✔
935
      goto _OVER;
6,984✔
936
    }
937

938
    if (!online) {
2,760,927✔
939
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
816,235✔
940
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
941
    } else {
942
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
1,944,692✔
943
            statusReq.dnodeVer, dnodeVer, reboot);
944
    }
945

946
    pDnode->rebootTime = statusReq.rebootTime;
2,760,927✔
947
    pDnode->numOfCores = statusReq.numOfCores;
2,760,927✔
948
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,760,927✔
949
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,760,927✔
950
    pDnode->memAvail = statusReq.memAvail;
2,760,927✔
951
    pDnode->memTotal = statusReq.memTotal;
2,760,927✔
952
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,760,927✔
953
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,760,927✔
954
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,760,927✔
955
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
802,682✔
956
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
802,682✔
957
        goto _OVER;
×
958
      }
959
    }
960

961
    SStatusRsp statusRsp = {0};
2,760,927✔
962
    statusRsp.statusSeq++;
2,760,927✔
963
    statusRsp.analVer = analVer;
2,760,927✔
964
    statusRsp.dnodeVer = dnodeVer;
2,760,927✔
965
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,760,927✔
966
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,760,927✔
967
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,760,927✔
968
    if (statusRsp.pDnodeEps == NULL) {
2,760,927✔
969
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
970
      goto _OVER;
×
971
    }
972

973
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,760,927✔
974
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,760,927✔
975
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,760,927✔
976

977
    if (auditDB[0] != '\0') {
2,760,927✔
978
      mInfo("dnode:%d, set audit db %s in process status rsp", statusReq.dnodeId, auditDB);
145✔
979
      tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
145✔
980
    }
981
    // TODO dmchen get audit db token
982

983
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,760,927✔
984
    void   *pHead = rpcMallocCont(contLen);
2,760,927✔
985
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,760,927✔
986
    taosArrayDestroy(statusRsp.pDnodeEps);
2,760,927✔
987
    if (contLen < 0) {
2,760,927✔
988
      code = contLen;
×
989
      goto _OVER;
×
990
    }
991

992
    pReq->info.rspLen = contLen;
2,760,927✔
993
    pReq->info.rsp = pHead;
2,760,927✔
994
  }
995

996
  pDnode->accessTimes++;
41,267,564✔
997
  pDnode->lastAccessTime = curMs;
41,267,564✔
998
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
41,267,564✔
999
    pDnode->offlineReason = DND_REASON_ONLINE;
780✔
1000
  }
1001
  code = 0;
41,267,564✔
1002

1003
_OVER:
41,903,313✔
1004
  mndReleaseDnode(pMnode, pDnode);
41,903,313✔
1005
  taosArrayDestroy(statusReq.pVloads);
41,903,313✔
1006
  if (code != 0) {
41,903,313✔
1007
    mError("dnode:%d, failed to process status req since %s", statusReq.dnodeId, tstrerror(code));
546,669✔
1008
    return code;
546,669✔
1009
  }
1010

1011
  return mndUpdClusterInfo(pReq);
41,356,644✔
1012
}
1013

1014
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
1015
  SMnode    *pMnode = pReq->info.node;
×
1016
  SNotifyReq notifyReq = {0};
×
1017
  int32_t    code = 0;
×
1018

1019
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
1020
    terrno = code;
×
1021
    goto _OVER;
×
1022
  }
1023

1024
  int64_t clusterid = mndGetClusterId(pMnode);
×
1025
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
1026
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
1027
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1028
          notifyReq.clusterId, clusterid, tstrerror(code));
1029
    goto _OVER;
×
1030
  }
1031

1032
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
1033
  for (int32_t v = 0; v < nVgroup; ++v) {
×
1034
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1035

1036
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
1037
    if (pVgroup != NULL) {
×
1038
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
1039
      mndReleaseVgroup(pMnode, pVgroup);
×
1040
    }
1041
  }
1042
  code = mndUpdClusterInfo(pReq);
×
1043
_OVER:
×
1044
  tFreeSNotifyReq(&notifyReq);
×
1045
  return code;
×
1046
}
1047

1048
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
202,648✔
1049
  int32_t  code = -1;
202,648✔
1050
  SSdbRaw *pRaw = NULL;
202,648✔
1051
  STrans  *pTrans = NULL;
202,648✔
1052

1053
  SDnodeObj dnodeObj = {0};
202,648✔
1054
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
202,648✔
1055
  dnodeObj.createdTime = taosGetTimestampMs();
202,648✔
1056
  dnodeObj.updateTime = dnodeObj.createdTime;
202,648✔
1057
  dnodeObj.port = pCreate->port;
202,648✔
1058
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
202,648✔
1059
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
202,648✔
1060

1061
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
202,648✔
1062
  if (pTrans == NULL) {
202,648✔
1063
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1064
    if (terrno != 0) code = terrno;
×
1065
    goto _OVER;
×
1066
  }
1067
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
202,648✔
1068
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
202,648✔
1069

1070
  pRaw = mndDnodeActionEncode(&dnodeObj);
202,648✔
1071
  if (pRaw == NULL) {
202,648✔
1072
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1073
    if (terrno != 0) code = terrno;
×
1074
    goto _OVER;
×
1075
  }
1076
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
202,648✔
1077
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
202,648✔
1078
  pRaw = NULL;
202,648✔
1079

1080
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
202,648✔
1081
  code = 0;
202,648✔
1082

1083
_OVER:
202,648✔
1084
  mndTransDrop(pTrans);
202,648✔
1085
  sdbFreeRaw(pRaw);
202,648✔
1086
  return code;
202,648✔
1087
}
1088

1089
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
81,195✔
1090
  SMnode       *pMnode = pReq->info.node;
81,195✔
1091
  SSdb         *pSdb = pMnode->pSdb;
81,195✔
1092
  SDnodeObj    *pObj = NULL;
81,195✔
1093
  void         *pIter = NULL;
81,195✔
1094
  SDnodeListRsp rsp = {0};
81,195✔
1095
  int32_t       code = -1;
81,195✔
1096

1097
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
81,195✔
1098
  if (NULL == rsp.dnodeList) {
81,195✔
1099
    mError("failed to alloc epSet while process dnode list req");
×
1100
    code = terrno;
×
1101
    goto _OVER;
×
1102
  }
1103

1104
  while (1) {
182,362✔
1105
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
263,557✔
1106
    if (pIter == NULL) break;
263,557✔
1107

1108
    SDNodeAddr dnodeAddr = {0};
182,362✔
1109
    dnodeAddr.nodeId = pObj->id;
182,362✔
1110
    dnodeAddr.epSet.numOfEps = 1;
182,362✔
1111
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
182,362✔
1112
    dnodeAddr.epSet.eps[0].port = pObj->port;
182,362✔
1113

1114
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
364,724✔
1115
      if (terrno != 0) code = terrno;
×
1116
      sdbRelease(pSdb, pObj);
×
1117
      sdbCancelFetch(pSdb, pIter);
×
1118
      goto _OVER;
×
1119
    }
1120

1121
    sdbRelease(pSdb, pObj);
182,362✔
1122
  }
1123

1124
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
81,195✔
1125
  void   *pRsp = rpcMallocCont(rspLen);
81,195✔
1126
  if (pRsp == NULL) {
81,195✔
1127
    code = terrno;
×
1128
    goto _OVER;
×
1129
  }
1130

1131
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
81,195✔
1132
    code = rspLen;
×
1133
    goto _OVER;
×
1134
  }
1135

1136
  pReq->info.rspLen = rspLen;
81,195✔
1137
  pReq->info.rsp = pRsp;
81,195✔
1138
  code = 0;
81,195✔
1139

1140
_OVER:
81,195✔
1141

1142
  if (code != 0) {
81,195✔
1143
    mError("failed to get dnode list since %s", tstrerror(code));
×
1144
  }
1145

1146
  tFreeSDnodeListRsp(&rsp);
81,195✔
1147

1148
  TAOS_RETURN(code);
81,195✔
1149
}
1150

1151
void getSlowLogScopeString(int32_t scope, char *result) {
2,555✔
1152
  if (scope == SLOW_LOG_TYPE_NULL) {
2,555✔
1153
    (void)strncat(result, "NONE", 64);
×
1154
    return;
×
1155
  }
1156
  while (scope > 0) {
5,110✔
1157
    if (scope & SLOW_LOG_TYPE_QUERY) {
2,555✔
1158
      (void)strncat(result, "QUERY", 64);
2,555✔
1159
      scope &= ~SLOW_LOG_TYPE_QUERY;
2,555✔
1160
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1161
      (void)strncat(result, "INSERT", 64);
×
1162
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1163
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1164
      (void)strncat(result, "OTHERS", 64);
×
1165
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1166
    } else {
1167
      (void)printf("invalid slow log scope:%d", scope);
×
1168
      return;
×
1169
    }
1170

1171
    if (scope > 0) {
2,555✔
1172
      (void)strncat(result, "|", 64);
×
1173
    }
1174
  }
1175
}
1176

1177
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
203,013✔
1178
  SMnode         *pMnode = pReq->info.node;
203,013✔
1179
  int32_t         code = -1;
203,013✔
1180
  SDnodeObj      *pDnode = NULL;
203,013✔
1181
  SCreateDnodeReq createReq = {0};
203,013✔
1182
  int32_t         lino = 0;
203,013✔
1183
  int64_t         tss = taosGetTimestampMs();
203,013✔
1184

1185
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
203,013✔
1186
    goto _OVER;
×
1187
  }
1188

1189
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
203,013✔
1190
  TAOS_CHECK_GOTO(code, &lino, _OVER);
203,013✔
1191

1192
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
203,013✔
1193
  code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE);
203,013✔
1194
  TAOS_CHECK_GOTO(code, &lino, _OVER);
203,013✔
1195

1196
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
202,648✔
1197
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1198
    goto _OVER;
×
1199
  }
1200
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1201
  // if (code != 0) {
1202
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1203
  //          tstrerror(code));
1204
  //   goto _OVER;
1205
  // }
1206

1207
  char ep[TSDB_EP_LEN];
202,648✔
1208
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
202,648✔
1209
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
202,648✔
1210
  if (pDnode != NULL) {
202,648✔
1211
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1212
    goto _OVER;
×
1213
  }
1214

1215
  code = mndCreateDnode(pMnode, pReq, &createReq);
202,648✔
1216
  if (code == 0) {
202,648✔
1217
    code = TSDB_CODE_ACTION_IN_PROGRESS;
202,648✔
1218
    tsGrantHBInterval = 5;
202,648✔
1219
  }
1220

1221
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
202,648✔
1222
    char obj[200] = {0};
202,648✔
1223
    (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
202,648✔
1224

1225
    int64_t tse = taosGetTimestampMs();
202,648✔
1226
    double  duration = (double)(tse - tss);
202,648✔
1227
    duration = duration / 1000;
202,648✔
1228
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
202,648✔
1229
  }
1230

1231
_OVER:
203,013✔
1232
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
203,013✔
1233
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
365✔
1234
  }
1235

1236
  mndReleaseDnode(pMnode, pDnode);
203,013✔
1237
  tFreeSCreateDnodeReq(&createReq);
203,013✔
1238
  TAOS_RETURN(code);
203,013✔
1239
}
1240

1241
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1242

1243
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
5,440✔
1244

1245
#ifndef TD_ENTERPRISE
1246
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1247
#endif
1248

1249
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
13,329✔
1250
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1251
  int32_t  code = -1;
13,329✔
1252
  SSdbRaw *pRaw = NULL;
13,329✔
1253
  STrans  *pTrans = NULL;
13,329✔
1254
  int32_t  lino = 0;
13,329✔
1255

1256
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
13,329✔
1257
  if (pTrans == NULL) {
13,329✔
1258
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1259
    if (terrno != 0) code = terrno;
×
1260
    goto _OVER;
×
1261
  }
1262
  mndTransSetGroupParallel(pTrans);
13,329✔
1263
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
13,329✔
1264
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
13,329✔
1265

1266
  pRaw = mndDnodeActionEncode(pDnode);
13,329✔
1267
  if (pRaw == NULL) {
13,329✔
1268
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1269
    if (terrno != 0) code = terrno;
×
1270
    goto _OVER;
×
1271
  }
1272
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
13,329✔
1273
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
13,329✔
1274
  pRaw = NULL;
13,329✔
1275

1276
  pRaw = mndDnodeActionEncode(pDnode);
13,329✔
1277
  if (pRaw == NULL) {
13,329✔
1278
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1279
    if (terrno != 0) code = terrno;
×
1280
    goto _OVER;
×
1281
  }
1282
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
13,329✔
1283
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
13,329✔
1284
  pRaw = NULL;
13,329✔
1285

1286
  if (pSObj != NULL) {
13,329✔
1287
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1,161✔
1288
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
1,161✔
1289
  }
1290

1291
  if (pMObj != NULL) {
13,329✔
1292
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
653✔
1293
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
653✔
1294
  }
1295

1296
  if (pQObj != NULL) {
13,329✔
1297
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
360✔
1298
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
360✔
1299
  }
1300

1301
  if (pBObj != NULL) {
13,329✔
1302
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1,349✔
1303
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
1,349✔
1304
  }
1305

1306
  if (numOfVnodes > 0) {
11,980✔
1307
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
8,642✔
1308
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
8,642✔
1309
  }
1310

1311
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
11,980✔
1312

1313
  code = 0;
11,980✔
1314

1315
_OVER:
13,329✔
1316
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
13,329✔
1317
  mndTransDrop(pTrans);
13,329✔
1318
  sdbFreeRaw(pRaw);
13,329✔
1319
  TAOS_RETURN(code);
13,329✔
1320
}
1321

1322
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1323
  bool       isEmpty = false;
×
1324
  SMnodeObj *pMObj = NULL;
×
1325
  SQnodeObj *pQObj = NULL;
×
1326
  SSnodeObj *pSObj = NULL;
×
1327

1328
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1329
  if (pQObj) goto _OVER;
×
1330

1331
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1332
  if (pSObj) goto _OVER;
×
1333

1334
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1335
  if (pMObj) goto _OVER;
×
1336

1337
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1338
  if (numOfVnodes > 0) goto _OVER;
×
1339

1340
  isEmpty = true;
×
1341
_OVER:
×
1342
  mndReleaseMnode(pMnode, pMObj);
×
1343
  mndReleaseQnode(pMnode, pQObj);
×
1344
  mndReleaseSnode(pMnode, pSObj);
×
1345
  return isEmpty;
×
1346
}
1347

1348
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
15,382✔
1349
  SMnode       *pMnode = pReq->info.node;
15,382✔
1350
  int32_t       code = -1;
15,382✔
1351
  SDnodeObj    *pDnode = NULL;
15,382✔
1352
  SMnodeObj    *pMObj = NULL;
15,382✔
1353
  SQnodeObj    *pQObj = NULL;
15,382✔
1354
  SSnodeObj    *pSObj = NULL;
15,382✔
1355
  SBnodeObj    *pBObj = NULL;
15,382✔
1356
  SDropDnodeReq dropReq = {0};
15,382✔
1357
  int64_t       tss = taosGetTimestampMs();
15,382✔
1358

1359
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
15,382✔
1360

1361
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
15,382✔
1362
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1363
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
15,382✔
1364

1365
  bool force = dropReq.force;
15,017✔
1366
  if (dropReq.unsafe) {
15,017✔
1367
    force = true;
×
1368
  }
1369

1370
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
15,017✔
1371
  if (pDnode == NULL) {
15,017✔
1372
    int32_t err = terrno;
×
1373
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1374
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1375
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1376
    if (pDnode == NULL) {
×
1377
      code = err;
×
1378
      goto _OVER;
×
1379
    }
1380
  }
1381

1382
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
15,017✔
1383
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
15,017✔
1384
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
15,017✔
1385
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
15,017✔
1386
  if (pMObj != NULL) {
15,017✔
1387
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
2,341✔
1388
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
1,035✔
1389
      goto _OVER;
1,035✔
1390
    }
1391
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
1,306✔
1392
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
653✔
1393
      goto _OVER;
653✔
1394
    }
1395
  }
1396

1397
#ifdef USE_MOUNT
1398
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
13,329✔
1399
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1400
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1401
    goto _OVER;
×
1402
  }
1403
#endif
1404

1405
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
13,329✔
1406
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
13,329✔
1407

1408
  if (isonline && force) {
13,329✔
1409
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1410
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1411
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1412
    goto _OVER;
×
1413
  }
1414

1415
  mError("vnode num:%d", numOfVnodes);
13,329✔
1416

1417
  bool    vnodeOffline = false;
13,329✔
1418
  void   *pIter = NULL;
13,329✔
1419
  int32_t vgId = -1;
13,329✔
1420
  while (1) {
25,082✔
1421
    SVgObj *pVgroup = NULL;
38,411✔
1422
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
38,411✔
1423
    if (pIter == NULL) break;
38,411✔
1424

1425
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
76,232✔
1426
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
51,150✔
1427
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
51,150✔
1428
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
17,529✔
1429
          vgId = pVgroup->vgId;
×
1430
          vnodeOffline = true;
×
1431
          break;
×
1432
        }
1433
      }
1434
    }
1435

1436
    sdbRelease(pMnode->pSdb, pVgroup);
25,082✔
1437

1438
    if (vnodeOffline) {
25,082✔
1439
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1440
      break;
×
1441
    }
1442
  }
1443

1444
  if (vnodeOffline && !force) {
13,329✔
1445
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1446
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1447
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1448
    goto _OVER;
×
1449
  }
1450

1451
  if (!isonline && !force) {
13,329✔
1452
    code = TSDB_CODE_DNODE_OFFLINE;
×
1453
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
×
1454
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1455
    goto _OVER;
×
1456
  }
1457

1458
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
13,329✔
1459
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
13,329✔
1460

1461
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
13,329✔
1462
    char obj1[30] = {0};
13,329✔
1463
    (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
13,329✔
1464

1465
    int64_t tse = taosGetTimestampMs();
13,329✔
1466
    double  duration = (double)(tse - tss);
13,329✔
1467
    duration = duration / 1000;
13,329✔
1468
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
13,329✔
1469
  }
1470

1471
_OVER:
15,382✔
1472
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
15,382✔
1473
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
3,402✔
1474
  }
1475

1476
  mndReleaseDnode(pMnode, pDnode);
15,382✔
1477
  mndReleaseMnode(pMnode, pMObj);
15,382✔
1478
  mndReleaseQnode(pMnode, pQObj);
15,382✔
1479
  mndReleaseBnode(pMnode, pBObj);
15,382✔
1480
  mndReleaseSnode(pMnode, pSObj);
15,382✔
1481
  tFreeSDropDnodeReq(&dropReq);
15,382✔
1482
  TAOS_RETURN(code);
15,382✔
1483
}
1484

1485
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1,432✔
1486
  int32_t code = 0;
1,432✔
1487
  SMnode *pMnode = pReq->info.node;
1,432✔
1488
  SSdb   *pSdb = pMnode->pSdb;
1,432✔
1489
  void   *pIter = NULL;
1,432✔
1490
  int8_t  encrypting = 0;
1,432✔
1491

1492
  const STraceId *trace = &pReq->info.traceId;
1,432✔
1493

1494
  int32_t klen = strlen(pDcfgReq->value);
1,432✔
1495
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1,432✔
1496
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1497
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1498
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1499
    goto _exit;
×
1500
  }
1501

1502
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1,432✔
1503
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1504
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1505
    goto _exit;
×
1506
  }
1507

1508
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1,432✔
1509
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1510
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1511
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1512
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1513
    goto _exit;
×
1514
  }
1515

1516
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1,432✔
1517
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1,432✔
1518
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1,432✔
1519

1520
  while (1) {
3,018✔
1521
    SDnodeObj *pDnode = NULL;
4,450✔
1522
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
4,450✔
1523
    if (pIter == NULL) break;
4,450✔
1524
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
3,018✔
1525
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1526
             offlineReason[pDnode->offlineReason]);
1527
      sdbRelease(pSdb, pDnode);
×
1528
      continue;
×
1529
    }
1530

1531
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
3,018✔
1532
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
3,018✔
1533
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
3,018✔
1534
      void   *pBuf = rpcMallocCont(bufLen);
3,018✔
1535

1536
      if (pBuf != NULL) {
3,018✔
1537
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
3,018✔
1538
          code = bufLen;
×
1539
          sdbRelease(pSdb, pDnode);
×
1540
          goto _exit;
×
1541
        }
1542
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
3,018✔
1543
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
3,018✔
1544
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
3,018✔
1545
        }
1546
      }
1547
    }
1548

1549
    sdbRelease(pSdb, pDnode);
3,018✔
1550
  }
1551

1552
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1,432✔
1553
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1554
  }
1555

1556
_exit:
1,432✔
1557
  if (code != 0) {
1,432✔
1558
    if (terrno == 0) terrno = code;
×
1559
  }
1560
  return code;
1,432✔
1561
}
1562

1563
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1,432✔
1564
  int32_t code = 0;
1,432✔
1565

1566
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1567
  SMnode       *pMnode = pReq->info.node;
1,432✔
1568
  SMCfgDnodeReq cfgReq = {0};
1,432✔
1569
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1,432✔
1570

1571
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1,432✔
1572
    tFreeSMCfgDnodeReq(&cfgReq);
×
1573
    TAOS_RETURN(code);
×
1574
  }
1575
  const STraceId *trace = &pReq->info.traceId;
1,432✔
1576
  SDCfgDnodeReq   dcfgReq = {0};
1,432✔
1577
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1,432✔
1578
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
1,432✔
1579
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
1,432✔
1580
    tFreeSMCfgDnodeReq(&cfgReq);
1,432✔
1581
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1,432✔
1582
  } else {
1583
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1584
    tFreeSMCfgDnodeReq(&cfgReq);
×
1585
    TAOS_RETURN(code);
×
1586
  }
1587

1588
#else
1589
  TAOS_RETURN(code);
1590
#endif
1591
}
1592

1593
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
3,018✔
1594
  SMnode *pMnode = pRsp->info.node;
3,018✔
1595
  int16_t nSuccess = 0;
3,018✔
1596
  int16_t nFailed = 0;
3,018✔
1597

1598
  if (0 == pRsp->code) {
3,018✔
1599
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
3,018✔
1600
  } else {
1601
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1602
  }
1603

1604
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
3,018✔
1605
  bool    finished = nSuccess + nFailed >= nReq;
3,018✔
1606

1607
  if (finished) {
3,018✔
1608
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1,432✔
1609
  }
1610

1611
  const STraceId *trace = &pRsp->info.traceId;
3,018✔
1612
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
3,018✔
1613
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1614

1615
  return 0;
3,018✔
1616
}
1617

1618
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
2,555✔
1619
  SMnode *pMnode = pReq->info.node;
2,555✔
1620
  int32_t totalRows = 0;
2,555✔
1621
  int32_t numOfRows = 0;
2,555✔
1622
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
2,555✔
1623
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
2,555✔
1624
  char   *pWrite = NULL;
2,555✔
1625
  int32_t cols = 0;
2,555✔
1626
  int32_t code = 0;
2,555✔
1627
  int32_t lino = 0;
2,555✔
1628

1629
  cfgOpts[totalRows] = "statusIntervalMs";
2,555✔
1630
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
2,555✔
1631
  totalRows++;
2,555✔
1632

1633
  cfgOpts[totalRows] = "timezone";
2,555✔
1634
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
2,555✔
1635
  totalRows++;
2,555✔
1636

1637
  cfgOpts[totalRows] = "locale";
2,555✔
1638
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
2,555✔
1639
  totalRows++;
2,555✔
1640

1641
  cfgOpts[totalRows] = "charset";
2,555✔
1642
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
2,555✔
1643
  totalRows++;
2,555✔
1644

1645
  cfgOpts[totalRows] = "monitor";
2,555✔
1646
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
2,555✔
1647
  totalRows++;
2,555✔
1648

1649
  cfgOpts[totalRows] = "monitorInterval";
2,555✔
1650
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
2,555✔
1651
  totalRows++;
2,555✔
1652

1653
  cfgOpts[totalRows] = "slowLogThreshold";
2,555✔
1654
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
2,555✔
1655
  totalRows++;
2,555✔
1656

1657
  cfgOpts[totalRows] = "slowLogMaxLen";
2,555✔
1658
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
2,555✔
1659
  totalRows++;
2,555✔
1660

1661
  char scopeStr[64] = {0};
2,555✔
1662
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
2,555✔
1663
  cfgOpts[totalRows] = "slowLogScope";
2,555✔
1664
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
2,555✔
1665
  totalRows++;
2,555✔
1666

1667
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
2,555✔
1668
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
2,555✔
1669

1670
  for (int32_t i = 0; i < totalRows; i++) {
25,550✔
1671
    cols = 0;
22,995✔
1672

1673
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
22,995✔
1674
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,995✔
1675
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
22,995✔
1676

1677
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
22,995✔
1678
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,995✔
1679
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
22,995✔
1680

1681
    numOfRows++;
22,995✔
1682
  }
1683

1684
_OVER:
2,555✔
1685
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
2,555✔
1686
  pShow->numOfRows += numOfRows;
2,555✔
1687
  return numOfRows;
2,555✔
1688
}
1689

1690
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1691

1692
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
913,976✔
1693
  SMnode    *pMnode = pReq->info.node;
913,976✔
1694
  SSdb      *pSdb = pMnode->pSdb;
913,976✔
1695
  int32_t    numOfRows = 0;
913,976✔
1696
  int32_t    cols = 0;
913,976✔
1697
  ESdbStatus objStatus = 0;
913,976✔
1698
  SDnodeObj *pDnode = NULL;
913,976✔
1699
  int64_t    curMs = taosGetTimestampMs();
913,976✔
1700
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
913,750✔
1701
  int32_t    code = 0;
913,976✔
1702
  int32_t    lino = 0;
913,976✔
1703

1704
  while (numOfRows < rows) {
3,426,726✔
1705
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
3,426,726✔
1706
    if (pShow->pIter == NULL) break;
3,426,726✔
1707
    bool online = mndIsDnodeOnline(pDnode, curMs);
2,512,750✔
1708

1709
    cols = 0;
2,512,750✔
1710

1711
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,512,750✔
1712
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
2,512,750✔
1713

1714
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
2,512,750✔
1715

1716
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,512,750✔
1717
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,512,750✔
1718

1719
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,512,750✔
1720
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
2,512,750✔
1721
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
2,512,750✔
1722

1723
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,512,750✔
1724
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
2,512,750✔
1725
                        &lino, _OVER);
1726

1727
    const char *status = "ready";
2,512,750✔
1728
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
2,512,750✔
1729
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
2,512,750✔
1730
    if (!online) {
2,512,750✔
1731
      if (objStatus == SDB_STATUS_CREATING)
256,512✔
1732
        status = "creating*";
×
1733
      else if (objStatus == SDB_STATUS_DROPPING)
256,512✔
1734
        status = "dropping*";
×
1735
      else
1736
        status = "offline";
256,512✔
1737
    }
1738

1739
    STR_TO_VARSTR(buf, status);
2,512,750✔
1740
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,512,750✔
1741
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,512,750✔
1742

1743
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,512,750✔
1744
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
2,512,750✔
1745
                        _OVER);
1746

1747
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,512,750✔
1748
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
2,512,750✔
1749
                        _OVER);
1750

1751
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
2,512,750✔
1752
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
2,512,750✔
1753

1754
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,512,750✔
1755
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
2,512,750✔
1756
    taosMemoryFreeClear(b);
2,512,750✔
1757

1758
#ifdef TD_ENTERPRISE
1759
    STR_TO_VARSTR(buf, pDnode->machineId);
2,512,750✔
1760
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,512,750✔
1761
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,512,750✔
1762
#endif
1763

1764
    numOfRows++;
2,512,750✔
1765
    sdbRelease(pSdb, pDnode);
2,512,750✔
1766
  }
1767

1768
_OVER:
913,750✔
1769
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
913,976✔
1770

1771
  pShow->numOfRows += numOfRows;
913,976✔
1772
  return numOfRows;
913,976✔
1773
}
1774

1775
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1776
  SSdb *pSdb = pMnode->pSdb;
×
1777
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1778
}
×
1779

1780
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1781
  int32_t    code = 0;
×
1782
  SDnodeObj *pObj = NULL;
×
1783
  void      *pIter = NULL;
×
1784
  SSdb      *pSdb = pMnode->pSdb;
×
1785
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1786
  if (fqdns == NULL) {
×
1787
    mError("failed to init fqdns array");
×
1788
    return NULL;
×
1789
  }
1790

1791
  while (1) {
×
1792
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1793
    if (pIter == NULL) break;
×
1794

1795
    char *fqdn = taosStrdup(pObj->fqdn);
×
1796
    if (fqdn == NULL) {
×
1797
      sdbRelease(pSdb, pObj);
×
1798
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
1799

1800
      code = terrno;
×
1801
      break;
×
1802
    }
1803

1804
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1805
      mError("failed to fqdn into array, but continue at this time");
×
1806
    }
1807
    sdbRelease(pSdb, pObj);
×
1808
  }
1809

1810
_error:
×
1811
  if (code != 0) {
×
1812
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
1813
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
1814
      taosMemoryFreeClear(pFqdn);
×
1815
    }
1816
    taosArrayDestroy(fqdns);
×
1817
    fqdns = NULL;
×
1818
  }
1819

1820
  return fqdns;
×
1821
}
1822

1823
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
1824
  if (p == NULL) return NULL;
×
1825

1826
  switch (type) {
×
1827
    case SDB_DNODE:
×
1828
      return (SDnodeObj *)p;
×
1829
    case SDB_QNODE:
×
1830
      return ((SQnodeObj *)p)->pDnode;
×
1831
    case SDB_SNODE:
×
1832
      return ((SSnodeObj *)p)->pDnode;
×
1833
    case SDB_BNODE:
×
1834
      return ((SBnodeObj *)p)->pDnode;
×
1835
    default:
×
1836
      break;
×
1837
  }
1838
  return NULL;
×
1839
}
1840
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
1841
  int32_t lino = 0;
×
1842
  SSdb   *pSdb = pMnode->pSdb;
×
1843
  void   *pIter = NULL;
×
1844
  int32_t code = 0;
×
1845

1846
  while (1) {
×
1847
    void *pObj = NULL;
×
1848
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
1849
    if (pIter == NULL) break;
×
1850

1851
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
1852
    if (pDnodeObj == NULL) {
×
1853
      mError("null dnode object for type:%d", type);
×
1854
      sdbRelease(pSdb, pObj);
×
1855
      continue;
×
1856
    }
1857

1858
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
1859
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
1860
      mError("failed to push addr into array");
×
1861
      sdbRelease(pSdb, pObj);
×
1862
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1863
    }
1864
    sdbRelease(pSdb, pObj);
×
1865
  }
1866

1867
_exit:
×
1868
  return code;
×
1869
}
1870

1871
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
1872
  int32_t lino = 0;
×
1873
  int32_t code = 0;
×
1874
  if (pMnode == NULL || pAddr == NULL) {
×
1875
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
1876
  }
1877

1878
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
1879
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1880

1881
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
1882
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1883

1884
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
1885
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1886

1887
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
1888
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1889

1890
_error:
×
1891
  return code;
×
1892
}
1893

1894
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
1895
  int32_t code = 0;
×
1896

1897
  SMnode *pMnode = pReq->info.node;
×
1898
  void   *pIter = NULL;
×
1899
  SSdb   *pSdb = pMnode->pSdb;
×
1900
  mInfo("start to reload dnode tls config");
×
1901

1902
  SMCfgDnodeReq req = {0};
×
1903
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
1904
    goto _OVER;
×
1905
  }
1906

1907
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
1908
    goto _OVER;
×
1909
  }
1910

1911
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
1912
  if (pAddr == NULL) {
×
1913
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
1914
  }
1915

1916
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
1917

1918
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
1919
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
1920
    // SEpSet epSet = mndCreateEpSetByStr(addr);
1921
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
1922
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
1923
    if (code != 0) {
×
1924
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
1925
    }
1926
  }
1927

1928
_OVER:
×
1929
  tFreeSMCfgDnodeReq(&req);
×
1930
  taosArrayDestroy(pAddr);
×
1931
  return code;
×
1932
}
1933

1934
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
1935
  int32_t code = 0;
×
1936
  if (pRsp->code != 0) {
×
1937
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
1938
  } else {
1939
    mInfo("succeed to reload dnode tls config");
×
1940
  }
1941
  return code;
×
1942
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc