• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4897

25 Dec 2025 10:17AM UTC coverage: 65.717% (-0.2%) from 65.929%
#4897

push

travis-ci

web-flow
fix: [6622889291] Fix invalid rowSize. (#34043)

186011 of 283047 relevant lines covered (65.72%)

113853896.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.31
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndToken.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "taos_monitor.h"
34
#include "tconfig.h"
35
#include "tjson.h"
36
#include "tmisce.h"
37
#include "tunit.h"
38

39
#define TSDB_DNODE_VER_NUMBER   2
40
#define TSDB_DNODE_RESERVE_SIZE 40
41

42
static const char *offlineReason[] = {
43
    "",
44
    "status msg timeout",
45
    "status not received",
46
    "version not match",
47
    "dnodeId not match",
48
    "clusterId not match",
49
    "statusInterval not match",
50
    "timezone not match",
51
    "locale not match",
52
    "charset not match",
53
    "ttlChangeOnWrite not match",
54
    "enableWhiteList not match",
55
    "encryptionKey not match",
56
    "monitor not match",
57
    "monitor switch not match",
58
    "monitor interval not match",
59
    "monitor slow log threshold not match",
60
    "monitor slow log sql max len not match",
61
    "monitor slow log scope not match",
62
    "unknown",
63
};
64

65
enum {
66
  DND_ACTIVE_CODE,
67
  DND_CONN_ACTIVE_CODE,
68
};
69

70
enum {
71
  DND_CREATE,
72
  DND_ADD,
73
  DND_DROP,
74
};
75

76
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
77
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
78
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
79
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
80
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
81
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
82
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
83

84
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
85
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
86
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
87
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
88
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
89
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
90
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
91
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
92
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
93
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
94
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
95

96
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
98
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
99
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
100

101
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
102
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
103

104
#ifdef _GRANT
105
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
106
#else
107
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
108
#endif
109

110
int32_t mndInitDnode(SMnode *pMnode) {
513,738✔
111
  SSdbTable table = {
513,738✔
112
      .sdbType = SDB_DNODE,
113
      .keyType = SDB_KEY_INT32,
114
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
115
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
116
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
117
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
118
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
119
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
120
  };
121

122
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
513,738✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
513,738✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
513,738✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
513,738✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
513,738✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
513,738✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
513,738✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
513,738✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
513,738✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
513,738✔
132
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
513,738✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
513,738✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
513,738✔
135
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
513,738✔
136

137
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
513,738✔
138
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
513,738✔
139
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
513,738✔
140
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
513,738✔
141

142
  return sdbSetTable(pMnode->pSdb, table);
513,738✔
143
}
144

145
void mndCleanupDnode(SMnode *pMnode) {}
513,617✔
146

147
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
335,043✔
148
  int32_t  code = -1;
335,043✔
149
  SSdbRaw *pRaw = NULL;
335,043✔
150
  STrans  *pTrans = NULL;
335,043✔
151

152
  SDnodeObj dnodeObj = {0};
335,043✔
153
  dnodeObj.id = 1;
335,043✔
154
  dnodeObj.createdTime = taosGetTimestampMs();
335,043✔
155
  dnodeObj.updateTime = dnodeObj.createdTime;
335,043✔
156
  dnodeObj.port = tsServerPort;
335,043✔
157
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
335,043✔
158
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
335,043✔
159
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
335,043✔
160
  char *machineId = NULL;
335,043✔
161
  code = tGetMachineId(&machineId);
335,043✔
162
  if (machineId) {
335,043✔
163
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
335,043✔
164
    taosMemoryFreeClear(machineId);
335,043✔
165
  } else {
166
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
167
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
168
    goto _OVER;
×
169
#endif
170
  }
171

172
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
335,043✔
173
  if (pTrans == NULL) {
335,043✔
174
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
175
    if (terrno != 0) code = terrno;
×
176
    goto _OVER;
×
177
  }
178
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
335,043✔
179

180
  pRaw = mndDnodeActionEncode(&dnodeObj);
335,043✔
181
  if (pRaw == NULL) {
335,043✔
182
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
183
    if (terrno != 0) code = terrno;
×
184
    goto _OVER;
×
185
  }
186
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
335,043✔
187
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
335,043✔
188
  pRaw = NULL;
335,043✔
189

190
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
335,043✔
191
  code = 0;
335,043✔
192

193
_OVER:
335,043✔
194
  mndTransDrop(pTrans);
335,043✔
195
  sdbFreeRaw(pRaw);
335,043✔
196
  return code;
335,043✔
197
}
198

199
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,998,489✔
200
  int32_t code = 0;
2,998,489✔
201
  int32_t lino = 0;
2,998,489✔
202
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,998,489✔
203

204
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,998,489✔
205
  if (pRaw == NULL) goto _OVER;
2,998,489✔
206

207
  int32_t dataPos = 0;
2,998,489✔
208
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,998,489✔
209
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,998,489✔
210
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,998,489✔
211
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,998,489✔
212
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,998,489✔
213
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,998,489✔
214
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,998,489✔
215
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,998,489✔
216
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,998,489✔
217
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,998,489✔
218

219
  terrno = 0;
2,998,489✔
220

221
_OVER:
2,998,489✔
222
  if (terrno != 0) {
2,998,489✔
223
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
224
    sdbFreeRaw(pRaw);
×
225
    return NULL;
×
226
  }
227

228
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,998,489✔
229
  return pRaw;
2,998,489✔
230
}
231

232
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
2,245,917✔
233
  int32_t code = 0;
2,245,917✔
234
  int32_t lino = 0;
2,245,917✔
235
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,245,917✔
236
  SSdbRow   *pRow = NULL;
2,245,917✔
237
  SDnodeObj *pDnode = NULL;
2,245,917✔
238

239
  int8_t sver = 0;
2,245,917✔
240
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
2,245,917✔
241
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
2,245,917✔
242
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
243
    goto _OVER;
×
244
  }
245

246
  pRow = sdbAllocRow(sizeof(SDnodeObj));
2,245,917✔
247
  if (pRow == NULL) goto _OVER;
2,245,917✔
248

249
  pDnode = sdbGetRowObj(pRow);
2,245,917✔
250
  if (pDnode == NULL) goto _OVER;
2,245,917✔
251

252
  int32_t dataPos = 0;
2,245,917✔
253
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
2,245,917✔
254
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
2,245,917✔
255
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
2,245,917✔
256
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
2,245,917✔
257
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,245,917✔
258
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,245,917✔
259
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,245,917✔
260
  if (sver > 1) {
2,245,917✔
261
    int16_t keyLen = 0;
2,245,917✔
262
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,245,917✔
263
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,245,917✔
264
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
2,245,917✔
265
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
2,245,917✔
266
  }
267

268
  terrno = 0;
2,245,917✔
269
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
2,245,917✔
270
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
271
  }
272

273
_OVER:
2,245,917✔
274
  if (terrno != 0) {
2,245,917✔
275
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
276
    taosMemoryFreeClear(pRow);
×
277
    return NULL;
×
278
  }
279

280
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
2,245,917✔
281
  return pRow;
2,245,917✔
282
}
283

284
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
1,119,462✔
285
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
1,119,462✔
286
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
1,119,462✔
287

288
  char ep[TSDB_EP_LEN] = {0};
1,119,462✔
289
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
1,119,462✔
290
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
1,119,462✔
291
  return 0;
1,119,462✔
292
}
293

294
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
2,245,832✔
295
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
2,245,832✔
296
  return 0;
2,245,832✔
297
}
298

299
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
1,114,016✔
300
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
1,114,016✔
301
  pOld->updateTime = pNew->updateTime;
1,114,016✔
302
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
303
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
1,114,016✔
304
#endif
305
  return 0;
1,114,016✔
306
}
307

308
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
157,109,240✔
309
  SSdb      *pSdb = pMnode->pSdb;
157,109,240✔
310
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
157,109,240✔
311
  if (pDnode == NULL) {
157,109,330✔
312
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
321,566✔
313
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
89,772✔
314
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
231,794✔
315
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
316
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
231,794✔
317
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
231,794✔
318
    } else {
319
      terrno = TSDB_CODE_APP_ERROR;
×
320
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
321
    }
322
  }
323

324
  return pDnode;
157,109,330✔
325
}
326

327
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
158,758,432✔
328
  SSdb *pSdb = pMnode->pSdb;
158,758,432✔
329
  sdbRelease(pSdb, pDnode);
158,758,432✔
330
}
158,758,432✔
331

332
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
10,377,855✔
333
  SEpSet epSet = {0};
10,377,855✔
334
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
10,377,855✔
335
  return epSet;
10,377,855✔
336
}
337

338
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
1,212,404✔
339
  SEpSet     epSet = {0};
1,212,404✔
340
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,212,404✔
341
  if (!pDnode) return epSet;
1,212,404✔
342

343
  epSet = mndGetDnodeEpset(pDnode);
1,212,404✔
344

345
  mndReleaseDnode(pMnode, pDnode);
1,212,404✔
346
  return epSet;
1,212,404✔
347
}
348

349
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,706,532✔
350
  SSdb *pSdb = pMnode->pSdb;
1,706,532✔
351

352
  void *pIter = NULL;
1,706,532✔
353
  while (1) {
3,370,039✔
354
    SDnodeObj *pDnode = NULL;
5,076,571✔
355
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
5,076,571✔
356
    if (pIter == NULL) break;
5,076,571✔
357

358
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
4,093,039✔
359
      sdbCancelFetch(pSdb, pIter);
723,000✔
360
      return pDnode;
723,000✔
361
    }
362

363
    sdbRelease(pSdb, pDnode);
3,370,039✔
364
  }
365

366
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
983,532✔
367
  return NULL;
983,532✔
368
}
369

370
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
178,385✔
371
  SSdb *pSdb = pMnode->pSdb;
178,385✔
372

373
  void *pIter = NULL;
178,385✔
374
  while (1) {
188,710✔
375
    SDnodeObj *pDnode = NULL;
367,095✔
376
    ESdbStatus objStatus = 0;
367,095✔
377
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
367,095✔
378
    if (pIter == NULL) break;
367,095✔
379

380
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
367,095✔
381
      sdbCancelFetch(pSdb, pIter);
178,385✔
382
      return pDnode;
178,385✔
383
    }
384

385
    sdbRelease(pSdb, pDnode);
188,710✔
386
  }
387

388
  return NULL;
×
389
}
390

391
int32_t mndGetDnodeSize(SMnode *pMnode) {
71,813,858✔
392
  SSdb *pSdb = pMnode->pSdb;
71,813,858✔
393
  return sdbGetSize(pSdb, SDB_DNODE);
71,813,858✔
394
}
395

396
int32_t mndGetDbSize(SMnode *pMnode) {
×
397
  SSdb *pSdb = pMnode->pSdb;
×
398
  return sdbGetSize(pSdb, SDB_DB);
×
399
}
400

401
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
92,115,412✔
402
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
92,115,412✔
403
  if (interval > (int64_t)tsStatusTimeoutMs) {
92,112,293✔
404
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,947,603✔
405
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
66,456✔
406
    }
407
    return false;
2,947,629✔
408
  }
409
  return true;
89,164,690✔
410
}
411

412
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,672,619✔
413
  SSdb *pSdb = pMnode->pSdb;
2,672,619✔
414

415
  int32_t numOfEps = 0;
2,672,619✔
416
  void   *pIter = NULL;
2,672,619✔
417
  while (1) {
9,786,004✔
418
    SDnodeObj *pDnode = NULL;
12,458,623✔
419
    ESdbStatus objStatus = 0;
12,458,623✔
420
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
12,458,623✔
421
    if (pIter == NULL) break;
12,458,623✔
422

423
    SDnodeEp dnodeEp = {0};
9,786,004✔
424
    dnodeEp.id = pDnode->id;
9,786,004✔
425
    dnodeEp.ep.port = pDnode->port;
9,786,004✔
426
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
9,786,004✔
427
    sdbRelease(pSdb, pDnode);
9,786,004✔
428

429
    dnodeEp.isMnode = 0;
9,786,004✔
430
    if (mndIsMnode(pMnode, pDnode->id)) {
9,786,004✔
431
      dnodeEp.isMnode = 1;
3,941,175✔
432
    }
433
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
9,786,004✔
434
      mError("failed to put ep into array, but continue at this call");
×
435
    }
436
  }
437
}
2,672,619✔
438

439
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
26,865,379✔
440
  SSdb   *pSdb = pMnode->pSdb;
26,865,379✔
441
  int32_t code = 0;
26,865,379✔
442

443
  int32_t numOfEps = 0;
26,865,379✔
444
  void   *pIter = NULL;
26,865,379✔
445
  while (1) {
121,864,276✔
446
    SDnodeObj *pDnode = NULL;
148,729,655✔
447
    ESdbStatus objStatus = 0;
148,729,655✔
448
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
148,729,655✔
449
    if (pIter == NULL) break;
148,729,655✔
450

451
    SDnodeInfo dInfo;
121,863,934✔
452
    dInfo.id = pDnode->id;
121,864,276✔
453
    dInfo.ep.port = pDnode->port;
121,864,276✔
454
    dInfo.offlineReason = pDnode->offlineReason;
121,864,276✔
455
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
121,864,276✔
456
    sdbRelease(pSdb, pDnode);
121,864,276✔
457
    if (mndIsMnode(pMnode, pDnode->id)) {
121,864,276✔
458
      dInfo.isMnode = 1;
37,989,789✔
459
    } else {
460
      dInfo.isMnode = 0;
83,874,487✔
461
    }
462

463
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
121,864,276✔
464
      code = terrno;
×
465
      sdbCancelFetch(pSdb, pIter);
×
466
      break;
×
467
    }
468
  }
469
  TAOS_RETURN(code);
26,865,379✔
470
}
471

472
#define CHECK_MONITOR_PARA(para, err)                                                                    \
473
  if (pCfg->monitorParas.para != para) {                                                                 \
474
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
475
    terrno = err;                                                                                        \
476
    return err;                                                                                          \
477
  }
478

479
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
2,679,743✔
480
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
2,679,743✔
481
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
2,679,743✔
482
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
2,679,743✔
483
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
2,679,743✔
484
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
2,679,743✔
485

486
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
2,679,743✔
487
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
488
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
489
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
490
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
491
  }
492

493
  /*
494
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
495
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
496
           tsStatusIntervalMs);
497
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
498
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
499
  }
500
  */
501

502
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
2,679,743✔
503
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
7,124✔
504
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
505
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
7,124✔
506
    return DND_REASON_TIME_ZONE_NOT_MATCH;
7,124✔
507
  }
508

509
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
2,672,619✔
510
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
511
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
512
    return DND_REASON_LOCALE_NOT_MATCH;
×
513
  }
514

515
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
2,672,619✔
516
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
517
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
518
    return DND_REASON_CHARSET_NOT_MATCH;
×
519
  }
520

521
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
2,672,619✔
522
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
523
           tsTtlChangeOnWrite);
524
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
525
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
526
  }
527
  int8_t enable = tsEnableWhiteList ? 1 : 0;
2,672,619✔
528
  if (pCfg->enableWhiteList != enable) {
2,672,619✔
529
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
530
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
531
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
532
  }
533

534
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
2,672,619✔
535
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
2,672,619✔
536
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
537
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
538
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
539
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
540
  }
541

542
  return DND_REASON_ONLINE;
2,672,619✔
543
}
544

545
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
74,073✔
546
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
74,073✔
547
    return 0.0;
617✔
548
  }
549

550
  int64_t deltaCount = currentCount - lastCount;
73,456✔
551
  int64_t deltaMs = currentTimeMs - lastTimeMs;
73,456✔
552
  double  rate = (double)deltaCount / (double)deltaMs;
73,456✔
553
  return rate;
73,456✔
554
}
555

556
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
104,190,864✔
557
  bool stateChanged = false;
104,190,864✔
558
  bool roleChanged = pGid->syncState != pVload->syncState ||
104,203,952✔
559
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
203,278,707✔
560
                     pGid->roleTimeMs != pVload->roleTimeMs;
99,087,843✔
561

562
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
104,190,864✔
563
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
151,975✔
564
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
72,742✔
565
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
79,233✔
566
      int64_t currentTimeMs = taosGetTimestampMs();
74,073✔
567
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
74,073✔
568
                                          pGid->lastSyncAppliedIndexUpdateTime);
569

570
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
74,073✔
571
    }
572
  }
573

574
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
104,190,864✔
575
  pGid->syncCommitIndex = pVload->syncCommitIndex;
104,190,864✔
576
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
104,190,864✔
577
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
104,190,864✔
578
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
104,190,864✔
579
      pGid->startTimeMs != pVload->startTimeMs) {
98,547,849✔
580
    mInfo(
5,643,015✔
581
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
582
        "canRead:%d, dnode:%d",
583
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
584
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
585
    pGid->syncState = pVload->syncState;
5,643,015✔
586
    pGid->syncTerm = pVload->syncTerm;
5,643,015✔
587
    pGid->syncRestore = pVload->syncRestore;
5,643,015✔
588
    pGid->syncCanRead = pVload->syncCanRead;
5,643,015✔
589
    pGid->startTimeMs = pVload->startTimeMs;
5,643,015✔
590
    pGid->roleTimeMs = pVload->roleTimeMs;
5,643,015✔
591
    stateChanged = true;
5,643,015✔
592
  }
593
  return stateChanged;
104,190,864✔
594
}
595

596
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
24,277,110✔
597
  bool stateChanged = false;
24,277,110✔
598
  bool roleChanged = pObj->syncState != pMload->syncState ||
24,284,257✔
599
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
47,991,494✔
600
                     pObj->roleTimeMs != pMload->roleTimeMs;
23,714,384✔
601
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
24,277,110✔
602
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
583,291✔
603
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
604
          pObj->syncTerm, pMload->syncTerm);
605
    pObj->syncState = pMload->syncState;
583,291✔
606
    pObj->syncTerm = pMload->syncTerm;
583,291✔
607
    pObj->syncRestore = pMload->syncRestore;
583,291✔
608
    pObj->roleTimeMs = pMload->roleTimeMs;
583,291✔
609
    stateChanged = true;
583,291✔
610
  }
611
  return stateChanged;
24,277,110✔
612
}
613

614
extern char   *tsMonFwUri;
615
extern char   *tsMonSlowLogUri;
616
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
132✔
617
  SMnode    *pMnode = pReq->info.node;
132✔
618
  SStatisReq statisReq = {0};
132✔
619
  int32_t    code = -1;
132✔
620

621
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
132✔
622

623
  if (tsMonitorLogProtocol) {
132✔
624
    mInfo("process statis req,\n %s", statisReq.pCont);
132✔
625
  }
626

627
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
132✔
628
    monSendContent(statisReq.pCont, tsMonFwUri);
132✔
629
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
630
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
631
  }
632

633
  tFreeSStatisReq(&statisReq);
132✔
634
  return 0;
132✔
635
}
636

637
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
×
638
  mTrace("process audit req:%p", pReq);
×
639
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
640
    SMnode   *pMnode = pReq->info.node;
×
641
    SAuditReq auditReq = {0};
×
642

643
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
644

645
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
×
646

647
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
×
648
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
649

650
    tFreeSAuditReq(&auditReq);
×
651
  }
652
  return 0;
×
653
}
654

655
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
656
  mTrace("process audit req:%p", pReq);
×
657
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
658
    SMnode        *pMnode = pReq->info.node;
×
659
    SBatchAuditReq auditReq = {0};
×
660

661
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
662

663
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
664

665
    for (int32_t i = 0; i < nAudit; ++i) {
×
666
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
667
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
668

669
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
670
                     audit->duration, audit->affectedRows);
671
    }
672

673
    tFreeSBatchAuditReq(&auditReq);
×
674
  }
675
  return 0;
×
676
}
677

678
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
795,187✔
679
  int32_t       code = 0, lino = 0;
795,187✔
680
  SDnodeInfoReq infoReq = {0};
795,187✔
681
  int32_t       contLen = 0;
795,187✔
682
  void         *pReq = NULL;
795,187✔
683

684
  infoReq.dnodeId = pDnode->id;
795,187✔
685
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
795,187✔
686

687
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
795,187✔
688
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
689
  }
690
  pReq = rpcMallocCont(contLen);
795,187✔
691
  if (pReq == NULL) {
795,187✔
692
    TAOS_RETURN(terrno);
×
693
  }
694

695
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
795,187✔
696
    code = contLen;
×
697
    goto _exit;
×
698
  }
699

700
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
795,187✔
701
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
795,187✔
702
_exit:
795,187✔
703
  if (code < 0) {
795,187✔
704
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
705
  }
706
  TAOS_RETURN(code);
795,187✔
707
}
708

709
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
795,187✔
710
  int32_t       code = 0, lino = 0;
795,187✔
711
  SMnode       *pMnode = pReq->info.node;
795,187✔
712
  SDnodeInfoReq infoReq = {0};
795,187✔
713
  SDnodeObj    *pDnode = NULL;
795,187✔
714
  STrans       *pTrans = NULL;
795,187✔
715
  SSdbRaw      *pCommitRaw = NULL;
795,187✔
716

717
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
795,187✔
718

719
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
795,187✔
720
  if (pDnode == NULL) {
795,187✔
721
    TAOS_CHECK_EXIT(terrno);
×
722
  }
723

724
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
795,187✔
725
  if (pTrans == NULL) {
795,187✔
726
    TAOS_CHECK_EXIT(terrno);
×
727
  }
728

729
  pDnode->updateTime = taosGetTimestampMs();
795,187✔
730

731
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
795,187✔
732
    TAOS_CHECK_EXIT(terrno);
×
733
  }
734
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
795,187✔
735
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
736
    TAOS_CHECK_EXIT(code);
×
737
  }
738
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
795,187✔
739
  pCommitRaw = NULL;
795,187✔
740

741
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
795,187✔
742
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
290✔
743
    TAOS_CHECK_EXIT(code);
290✔
744
  }
745

746
_exit:
795,187✔
747
  mndReleaseDnode(pMnode, pDnode);
795,187✔
748
  if (code != 0) {
795,187✔
749
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
290✔
750
  }
751
  mndTransDrop(pTrans);
795,187✔
752
  sdbFreeRaw(pCommitRaw);
795,187✔
753
  TAOS_RETURN(code);
795,187✔
754
}
755

756
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
41,890,922✔
757
  SMnode    *pMnode = pReq->info.node;
41,890,922✔
758
  SStatusReq statusReq = {0};
41,890,922✔
759
  SDnodeObj *pDnode = NULL;
41,890,922✔
760
  int32_t    code = -1;
41,890,922✔
761

762
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
41,890,922✔
763

764
  int64_t clusterid = mndGetClusterId(pMnode);
41,890,922✔
765
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
41,890,922✔
766
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
767
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
768
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
769
    goto _OVER;
×
770
  }
771

772
  if (statusReq.dnodeId == 0) {
41,890,922✔
773
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
1,254,725✔
774
    if (pDnode == NULL) {
1,254,725✔
775
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
533,130✔
776
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
533,130✔
777
      if (terrno != 0) code = terrno;
533,130✔
778
      goto _OVER;
533,130✔
779
    }
780
  } else {
781
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
40,636,197✔
782
    if (pDnode == NULL) {
40,636,197✔
783
      int32_t err = terrno;
254,815✔
784
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
254,815✔
785
      if (pDnode != NULL) {
254,815✔
786
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
1,405✔
787
        terrno = err;
1,405✔
788
        goto _OVER;
1,405✔
789
      }
790

791
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
253,410✔
792
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
253,410✔
793
        terrno = err;
75,025✔
794
        goto _OVER;
75,025✔
795
      } else {
796
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
178,385✔
797
        if (pDnode == NULL) goto _OVER;
178,385✔
798
      }
799
    }
800
  }
801

802
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
41,281,362✔
803
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
41,281,362✔
804

805
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
41,281,362✔
806
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
41,281,362✔
807
  int64_t curMs = taosGetTimestampMs();
41,281,362✔
808
  bool    online = mndIsDnodeOnline(pDnode, curMs);
41,281,362✔
809
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
41,281,362✔
810
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
41,281,362✔
811
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
41,281,362✔
812
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
41,281,362✔
813
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
41,281,362✔
814
  bool    analVerChanged = (analVer != statusReq.analVer);
41,281,362✔
815
  bool    auditDBChanged = false;
41,281,362✔
816
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
41,281,362✔
817
  bool    auditTokenChanged = false;
41,281,362✔
818
  char    auditToken[TSDB_TOKEN_LEN] = {0};
41,281,362✔
819

820
  if (tsAuditUseToken) {
41,281,362✔
821
    SDbObj *pDb = mndAcquireAuditDb(pMnode);
41,281,362✔
822
    if (pDb != NULL) {
41,281,362✔
823
      SName name = {0};
1,300✔
824
      if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0)
1,300✔
825
        mError("db:%s, failed to parse db name", pDb->name);
×
826
      tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN);
1,300✔
827
      mndReleaseDb(pMnode, pDb);
1,300✔
828
    }
829
    if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
41,281,362✔
830

831
    char    auditUser[TSDB_USER_LEN] = {0};
41,281,362✔
832
    int32_t ret = 0;
41,281,362✔
833
    if ((ret = mndGetAuditUser(pMnode, auditUser)) != 0) {
41,281,362✔
834
      mTrace("dnode:%d, failed to get audit user since %s", pDnode->id, tstrerror(ret));
×
835
    } else {
836
      mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser);
41,281,362✔
837
      int32_t ret = 0;
41,281,362✔
838
      if ((ret = mndGetUserActiveToken("audit", auditToken)) != 0) {
41,281,362✔
839
        mTrace("dnode:%d, failed to get audit user active token, token:%s, since %s", pDnode->id, auditToken,
41,280,062✔
840
               tstrerror(ret));
841
      } else {
842
        mTrace("dnode:%d, get audit user active token:%s", pDnode->id, auditToken);
1,300✔
843
        if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditTokenChanged = true;
1,300✔
844
      }
845
    }
846
  } 
847

848
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
40,468,476✔
849
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
38,602,345✔
850
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditTokenChanged;
81,749,838✔
851
  const STraceId *trace = &pReq->info.traceId;
41,281,362✔
852
  char            timestamp[TD_TIME_STR_LEN] = {0};
41,281,362✔
853
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
41,281,362✔
854
  mGTrace(
41,281,362✔
855
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
856
      "timestamp:%s",
857
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
858

859
  if (reboot) {
41,281,362✔
860
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
848,319✔
861
  }
862

863
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
41,281,362✔
864
  if (labs(delta) >= tsTimestampDeltaLimit) {
41,281,362✔
865
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
866
    code = terrno;
×
867

868
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
869
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
870
           tstrerror(code), tsTimestampDeltaLimit);
871
    goto _OVER;
×
872
  }
873
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
146,595,333✔
874
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
105,313,971✔
875

876
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
105,313,971✔
877
    if (pVgroup != NULL) {
105,313,971✔
878
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
104,253,806✔
879
        pVgroup->cacheUsage = pVload->cacheUsage;
73,039,616✔
880
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
73,039,616✔
881
        pVgroup->numOfTables = pVload->numOfTables;
73,039,616✔
882
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
73,039,616✔
883
        pVgroup->totalStorage = pVload->totalStorage;
73,039,616✔
884
        pVgroup->compStorage = pVload->compStorage;
73,039,616✔
885
        pVgroup->pointsWritten = pVload->pointsWritten;
73,039,616✔
886
      }
887
      bool stateChanged = false;
104,253,806✔
888
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
148,120,424✔
889
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
148,057,482✔
890
        if (pGid->dnodeId == statusReq.dnodeId) {
148,057,482✔
891
          if (pVload->startTimeMs == 0) {
104,190,864✔
892
            pVload->startTimeMs = statusReq.rebootTime;
×
893
          }
894
          if (pVload->roleTimeMs == 0) {
104,190,864✔
895
            pVload->roleTimeMs = statusReq.rebootTime;
×
896
          }
897
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
104,190,864✔
898
          break;
104,190,864✔
899
        }
900
      }
901
      if (stateChanged) {
104,253,806✔
902
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,643,015✔
903
        if (pDb != NULL && pDb->stateTs != curMs) {
5,643,015✔
904
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,909,507✔
905
                pDb->stateTs, curMs);
906
          pDb->stateTs = curMs;
3,909,507✔
907
        }
908
        mndReleaseDb(pMnode, pDb);
5,643,015✔
909
      }
910
    }
911

912
    mndReleaseVgroup(pMnode, pVgroup);
105,313,971✔
913
  }
914

915
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
41,281,362✔
916
  if (pObj != NULL) {
41,281,362✔
917
    if (statusReq.mload.roleTimeMs == 0) {
24,277,110✔
918
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
275,093✔
919
    }
920
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
24,277,110✔
921
    mndReleaseMnode(pMnode, pObj);
24,277,110✔
922
  }
923

924
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
41,281,362✔
925
  if (pQnode != NULL) {
41,281,362✔
926
    pQnode->load = statusReq.qload;
253,152✔
927
    mndReleaseQnode(pMnode, pQnode);
253,152✔
928
  }
929

930
  if (needCheck) {
41,281,362✔
931
    if (statusReq.sver != tsVersion) {
2,679,743✔
932
      if (pDnode != NULL) {
×
933
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
934
      }
935
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
936
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
937
      goto _OVER;
×
938
    }
939

940
    if (statusReq.dnodeId == 0) {
2,679,743✔
941
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
721,595✔
942
    } else {
943
      if (statusReq.clusterId != pMnode->clusterId) {
1,958,148✔
944
        if (pDnode != NULL) {
×
945
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
946
        }
947
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
948
               pMnode->clusterId);
949
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
950
        goto _OVER;
×
951
      }
952
    }
953

954
    // Verify whether the cluster parameters are consistent when status change from offline to ready
955
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
2,679,743✔
956
    if (pDnode->offlineReason != 0) {
2,679,743✔
957
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
7,124✔
958
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
7,124✔
959
      goto _OVER;
7,124✔
960
    }
961

962
    if (!online) {
2,672,619✔
963
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
805,762✔
964
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
965
    } else {
966
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
1,866,857✔
967
            statusReq.dnodeVer, dnodeVer, reboot);
968
    }
969

970
    pDnode->rebootTime = statusReq.rebootTime;
2,672,619✔
971
    pDnode->numOfCores = statusReq.numOfCores;
2,672,619✔
972
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,672,619✔
973
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,672,619✔
974
    pDnode->memAvail = statusReq.memAvail;
2,672,619✔
975
    pDnode->memTotal = statusReq.memTotal;
2,672,619✔
976
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,672,619✔
977
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,672,619✔
978
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,672,619✔
979
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
795,187✔
980
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
795,187✔
981
        goto _OVER;
×
982
      }
983
    }
984

985
    SStatusRsp statusRsp = {0};
2,672,619✔
986
    statusRsp.statusSeq++;
2,672,619✔
987
    statusRsp.analVer = analVer;
2,672,619✔
988
    statusRsp.dnodeVer = dnodeVer;
2,672,619✔
989
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,672,619✔
990
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,672,619✔
991
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,672,619✔
992
    if (statusRsp.pDnodeEps == NULL) {
2,672,619✔
993
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
994
      goto _OVER;
×
995
    }
996

997
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,672,619✔
998
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,672,619✔
999
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,672,619✔
1000

1001
    if (auditDB[0] != '\0') {
2,672,619✔
1002
      mInfo("dnode:%d, set audit db %s in process status rsp", statusReq.dnodeId, auditDB);
130✔
1003
      tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
130✔
1004
    }
1005
    if (auditToken[0] != '\0') {
2,672,619✔
1006
      mInfo("dnode:%d, set audit token %s in process status rsp", statusReq.dnodeId, auditToken);
130✔
1007
      tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN);
130✔
1008
    }
1009

1010
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,672,619✔
1011
    void   *pHead = rpcMallocCont(contLen);
2,672,619✔
1012
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,672,619✔
1013
    taosArrayDestroy(statusRsp.pDnodeEps);
2,672,619✔
1014
    if (contLen < 0) {
2,672,619✔
1015
      code = contLen;
×
1016
      goto _OVER;
×
1017
    }
1018

1019
    pReq->info.rspLen = contLen;
2,672,619✔
1020
    pReq->info.rsp = pHead;
2,672,619✔
1021
  }
1022

1023
  pDnode->accessTimes++;
41,274,238✔
1024
  pDnode->lastAccessTime = curMs;
41,274,238✔
1025
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
41,274,238✔
1026
    pDnode->offlineReason = DND_REASON_ONLINE;
×
1027
  }
1028
  code = 0;
41,274,238✔
1029

1030
_OVER:
41,890,922✔
1031
  mndReleaseDnode(pMnode, pDnode);
41,890,922✔
1032
  taosArrayDestroy(statusReq.pVloads);
41,890,922✔
1033
  if (code != 0) {
41,890,922✔
1034
    mError("dnode:%d, failed to process status req since %s", statusReq.dnodeId, tstrerror(code));
533,130✔
1035
    return code;
533,130✔
1036
  }
1037

1038
  return mndUpdClusterInfo(pReq);
41,357,792✔
1039
}
1040

1041
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
1042
  SMnode    *pMnode = pReq->info.node;
×
1043
  SNotifyReq notifyReq = {0};
×
1044
  int32_t    code = 0;
×
1045

1046
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
1047
    terrno = code;
×
1048
    goto _OVER;
×
1049
  }
1050

1051
  int64_t clusterid = mndGetClusterId(pMnode);
×
1052
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
1053
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
1054
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1055
          notifyReq.clusterId, clusterid, tstrerror(code));
1056
    goto _OVER;
×
1057
  }
1058

1059
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
1060
  for (int32_t v = 0; v < nVgroup; ++v) {
×
1061
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1062

1063
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
1064
    if (pVgroup != NULL) {
×
1065
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
1066
      mndReleaseVgroup(pMnode, pVgroup);
×
1067
    }
1068
  }
1069
  code = mndUpdClusterInfo(pReq);
×
1070
_OVER:
×
1071
  tFreeSNotifyReq(&notifyReq);
×
1072
  return code;
×
1073
}
1074

1075
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
196,992✔
1076
  int32_t  code = -1;
196,992✔
1077
  SSdbRaw *pRaw = NULL;
196,992✔
1078
  STrans  *pTrans = NULL;
196,992✔
1079

1080
  SDnodeObj dnodeObj = {0};
196,992✔
1081
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
196,992✔
1082
  dnodeObj.createdTime = taosGetTimestampMs();
196,992✔
1083
  dnodeObj.updateTime = dnodeObj.createdTime;
196,992✔
1084
  dnodeObj.port = pCreate->port;
196,992✔
1085
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
196,992✔
1086
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
196,992✔
1087

1088
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
196,992✔
1089
  if (pTrans == NULL) {
196,992✔
1090
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1091
    if (terrno != 0) code = terrno;
×
1092
    goto _OVER;
×
1093
  }
1094
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
196,992✔
1095
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
196,992✔
1096

1097
  pRaw = mndDnodeActionEncode(&dnodeObj);
196,992✔
1098
  if (pRaw == NULL) {
196,992✔
1099
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1100
    if (terrno != 0) code = terrno;
×
1101
    goto _OVER;
×
1102
  }
1103
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
196,992✔
1104
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
196,992✔
1105
  pRaw = NULL;
196,992✔
1106

1107
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
196,992✔
1108
  code = 0;
196,992✔
1109

1110
_OVER:
196,992✔
1111
  mndTransDrop(pTrans);
196,992✔
1112
  sdbFreeRaw(pRaw);
196,992✔
1113
  return code;
196,992✔
1114
}
1115

1116
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
77,574✔
1117
  SMnode       *pMnode = pReq->info.node;
77,574✔
1118
  SSdb         *pSdb = pMnode->pSdb;
77,574✔
1119
  SDnodeObj    *pObj = NULL;
77,574✔
1120
  void         *pIter = NULL;
77,574✔
1121
  SDnodeListRsp rsp = {0};
77,574✔
1122
  int32_t       code = -1;
77,574✔
1123

1124
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
77,574✔
1125
  if (NULL == rsp.dnodeList) {
77,574✔
1126
    mError("failed to alloc epSet while process dnode list req");
×
1127
    code = terrno;
×
1128
    goto _OVER;
×
1129
  }
1130

1131
  while (1) {
174,208✔
1132
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
251,782✔
1133
    if (pIter == NULL) break;
251,782✔
1134

1135
    SDNodeAddr dnodeAddr = {0};
174,208✔
1136
    dnodeAddr.nodeId = pObj->id;
174,208✔
1137
    dnodeAddr.epSet.numOfEps = 1;
174,208✔
1138
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
174,208✔
1139
    dnodeAddr.epSet.eps[0].port = pObj->port;
174,208✔
1140

1141
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
348,416✔
1142
      if (terrno != 0) code = terrno;
×
1143
      sdbRelease(pSdb, pObj);
×
1144
      sdbCancelFetch(pSdb, pIter);
×
1145
      goto _OVER;
×
1146
    }
1147

1148
    sdbRelease(pSdb, pObj);
174,208✔
1149
  }
1150

1151
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
77,574✔
1152
  void   *pRsp = rpcMallocCont(rspLen);
77,574✔
1153
  if (pRsp == NULL) {
77,574✔
1154
    code = terrno;
×
1155
    goto _OVER;
×
1156
  }
1157

1158
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
77,574✔
1159
    code = rspLen;
×
1160
    goto _OVER;
×
1161
  }
1162

1163
  pReq->info.rspLen = rspLen;
77,574✔
1164
  pReq->info.rsp = pRsp;
77,574✔
1165
  code = 0;
77,574✔
1166

1167
_OVER:
77,574✔
1168

1169
  if (code != 0) {
77,574✔
1170
    mError("failed to get dnode list since %s", tstrerror(code));
×
1171
  }
1172

1173
  tFreeSDnodeListRsp(&rsp);
77,574✔
1174

1175
  TAOS_RETURN(code);
77,574✔
1176
}
1177

1178
void getSlowLogScopeString(int32_t scope, char *result) {
2,506✔
1179
  if (scope == SLOW_LOG_TYPE_NULL) {
2,506✔
1180
    (void)strncat(result, "NONE", 64);
×
1181
    return;
×
1182
  }
1183
  while (scope > 0) {
5,012✔
1184
    if (scope & SLOW_LOG_TYPE_QUERY) {
2,506✔
1185
      (void)strncat(result, "QUERY", 64);
2,506✔
1186
      scope &= ~SLOW_LOG_TYPE_QUERY;
2,506✔
1187
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1188
      (void)strncat(result, "INSERT", 64);
×
1189
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1190
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1191
      (void)strncat(result, "OTHERS", 64);
×
1192
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1193
    } else {
1194
      (void)printf("invalid slow log scope:%d", scope);
×
1195
      return;
×
1196
    }
1197

1198
    if (scope > 0) {
2,506✔
1199
      (void)strncat(result, "|", 64);
×
1200
    }
1201
  }
1202
}
1203

1204
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
197,350✔
1205
  SMnode         *pMnode = pReq->info.node;
197,350✔
1206
  int32_t         code = -1;
197,350✔
1207
  SDnodeObj      *pDnode = NULL;
197,350✔
1208
  SCreateDnodeReq createReq = {0};
197,350✔
1209
  int32_t         lino = 0;
197,350✔
1210
  int64_t         tss = taosGetTimestampMs();
197,350✔
1211

1212
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
197,350✔
1213
    goto _OVER;
×
1214
  }
1215

1216
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
197,350✔
1217
  TAOS_CHECK_GOTO(code, &lino, _OVER);
197,350✔
1218

1219
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
197,350✔
1220
  code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_DNODE);
197,350✔
1221
  TAOS_CHECK_GOTO(code, &lino, _OVER);
197,350✔
1222

1223
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
196,992✔
1224
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1225
    goto _OVER;
×
1226
  }
1227
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1228
  // if (code != 0) {
1229
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1230
  //          tstrerror(code));
1231
  //   goto _OVER;
1232
  // }
1233

1234
  char ep[TSDB_EP_LEN];
196,992✔
1235
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
196,992✔
1236
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
196,992✔
1237
  if (pDnode != NULL) {
196,992✔
1238
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1239
    goto _OVER;
×
1240
  }
1241

1242
  code = mndCreateDnode(pMnode, pReq, &createReq);
196,992✔
1243
  if (code == 0) {
196,992✔
1244
    code = TSDB_CODE_ACTION_IN_PROGRESS;
196,992✔
1245
    tsGrantHBInterval = 5;
196,992✔
1246
  }
1247

1248
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
196,992✔
1249
    char obj[200] = {0};
196,992✔
1250
    (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
196,992✔
1251

1252
    int64_t tse = taosGetTimestampMs();
196,992✔
1253
    double  duration = (double)(tse - tss);
196,992✔
1254
    duration = duration / 1000;
196,992✔
1255
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
196,992✔
1256
  }
1257

1258
_OVER:
197,350✔
1259
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
197,350✔
1260
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
358✔
1261
  }
1262

1263
  mndReleaseDnode(pMnode, pDnode);
197,350✔
1264
  tFreeSCreateDnodeReq(&createReq);
197,350✔
1265
  TAOS_RETURN(code);
197,350✔
1266
}
1267

1268
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1269

1270
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
4,960✔
1271

1272
#ifndef TD_ENTERPRISE
1273
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1274
#endif
1275

1276
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
12,858✔
1277
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1278
  int32_t  code = -1;
12,858✔
1279
  SSdbRaw *pRaw = NULL;
12,858✔
1280
  STrans  *pTrans = NULL;
12,858✔
1281
  int32_t  lino = 0;
12,858✔
1282

1283
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
12,858✔
1284
  if (pTrans == NULL) {
12,858✔
1285
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1286
    if (terrno != 0) code = terrno;
×
1287
    goto _OVER;
×
1288
  }
1289
  mndTransSetGroupParallel(pTrans);
12,858✔
1290
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
12,858✔
1291
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
12,858✔
1292

1293
  pRaw = mndDnodeActionEncode(pDnode);
12,858✔
1294
  if (pRaw == NULL) {
12,858✔
1295
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1296
    if (terrno != 0) code = terrno;
×
1297
    goto _OVER;
×
1298
  }
1299
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
12,858✔
1300
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
12,858✔
1301
  pRaw = NULL;
12,858✔
1302

1303
  pRaw = mndDnodeActionEncode(pDnode);
12,858✔
1304
  if (pRaw == NULL) {
12,858✔
1305
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1306
    if (terrno != 0) code = terrno;
×
1307
    goto _OVER;
×
1308
  }
1309
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
12,858✔
1310
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
12,858✔
1311
  pRaw = NULL;
12,858✔
1312

1313
  if (pSObj != NULL) {
12,858✔
1314
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
760✔
1315
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
760✔
1316
  }
1317

1318
  if (pMObj != NULL) {
12,858✔
1319
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
627✔
1320
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
627✔
1321
  }
1322

1323
  if (pQObj != NULL) {
12,858✔
1324
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
347✔
1325
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
347✔
1326
  }
1327

1328
  if (pBObj != NULL) {
12,858✔
1329
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
1,326✔
1330
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
1,326✔
1331
  }
1332

1333
  if (numOfVnodes > 0) {
11,532✔
1334
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
8,296✔
1335
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
8,296✔
1336
  }
1337

1338
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
11,532✔
1339

1340
  code = 0;
11,532✔
1341

1342
_OVER:
12,858✔
1343
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
12,858✔
1344
  mndTransDrop(pTrans);
12,858✔
1345
  sdbFreeRaw(pRaw);
12,858✔
1346
  TAOS_RETURN(code);
12,858✔
1347
}
1348

1349
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1350
  bool       isEmpty = false;
×
1351
  SMnodeObj *pMObj = NULL;
×
1352
  SQnodeObj *pQObj = NULL;
×
1353
  SSnodeObj *pSObj = NULL;
×
1354

1355
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1356
  if (pQObj) goto _OVER;
×
1357

1358
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1359
  if (pSObj) goto _OVER;
×
1360

1361
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1362
  if (pMObj) goto _OVER;
×
1363

1364
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1365
  if (numOfVnodes > 0) goto _OVER;
×
1366

1367
  isEmpty = true;
×
1368
_OVER:
×
1369
  mndReleaseMnode(pMnode, pMObj);
×
1370
  mndReleaseQnode(pMnode, pQObj);
×
1371
  mndReleaseSnode(pMnode, pSObj);
×
1372
  return isEmpty;
×
1373
}
1374

1375
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
15,274✔
1376
  SMnode       *pMnode = pReq->info.node;
15,274✔
1377
  int32_t       code = -1;
15,274✔
1378
  SDnodeObj    *pDnode = NULL;
15,274✔
1379
  SMnodeObj    *pMObj = NULL;
15,274✔
1380
  SQnodeObj    *pQObj = NULL;
15,274✔
1381
  SSnodeObj    *pSObj = NULL;
15,274✔
1382
  SBnodeObj    *pBObj = NULL;
15,274✔
1383
  SDropDnodeReq dropReq = {0};
15,274✔
1384
  int64_t       tss = taosGetTimestampMs();
15,274✔
1385

1386
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
15,274✔
1387

1388
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
15,274✔
1389
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1390
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MNODE), NULL, _OVER);
15,274✔
1391

1392
  bool force = dropReq.force;
14,916✔
1393
  if (dropReq.unsafe) {
14,916✔
1394
    force = true;
×
1395
  }
1396

1397
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
14,916✔
1398
  if (pDnode == NULL) {
14,916✔
1399
    int32_t err = terrno;
×
1400
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1401
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1402
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1403
    if (pDnode == NULL) {
×
1404
      code = err;
×
1405
      goto _OVER;
×
1406
    }
1407
  }
1408

1409
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
14,916✔
1410
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
14,916✔
1411
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
14,916✔
1412
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
14,916✔
1413
  if (pMObj != NULL) {
14,916✔
1414
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
2,243✔
1415
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
989✔
1416
      goto _OVER;
989✔
1417
    }
1418
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
1,254✔
1419
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
627✔
1420
      goto _OVER;
627✔
1421
    }
1422
  }
1423

1424
#ifdef USE_MOUNT
1425
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
13,300✔
1426
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1427
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1428
    goto _OVER;
×
1429
  }
1430
#endif
1431

1432
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
13,300✔
1433
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
13,300✔
1434

1435
  if (isonline && force) {
13,300✔
1436
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1437
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1438
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1439
    goto _OVER;
×
1440
  }
1441

1442
  mError("vnode num:%d", numOfVnodes);
13,300✔
1443

1444
  bool    vnodeOffline = false;
13,300✔
1445
  void   *pIter = NULL;
13,300✔
1446
  int32_t vgId = -1;
13,300✔
1447
  while (1) {
24,828✔
1448
    SVgObj *pVgroup = NULL;
38,128✔
1449
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
38,128✔
1450
    if (pIter == NULL) break;
38,128✔
1451

1452
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
75,116✔
1453
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
50,288✔
1454
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
50,288✔
1455
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
17,186✔
1456
          vgId = pVgroup->vgId;
×
1457
          vnodeOffline = true;
×
1458
          break;
×
1459
        }
1460
      }
1461
    }
1462

1463
    sdbRelease(pMnode->pSdb, pVgroup);
24,828✔
1464

1465
    if (vnodeOffline) {
24,828✔
1466
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1467
      break;
×
1468
    }
1469
  }
1470

1471
  if (vnodeOffline && !force) {
13,300✔
1472
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1473
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1474
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1475
    goto _OVER;
×
1476
  }
1477

1478
  if (!isonline && !force) {
13,300✔
1479
    code = TSDB_CODE_DNODE_OFFLINE;
442✔
1480
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
442✔
1481
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1482
    goto _OVER;
442✔
1483
  }
1484

1485
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
12,858✔
1486
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
12,858✔
1487

1488
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
12,858✔
1489
    char obj1[30] = {0};
12,858✔
1490
    (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
12,858✔
1491

1492
    int64_t tse = taosGetTimestampMs();
12,858✔
1493
    double  duration = (double)(tse - tss);
12,858✔
1494
    duration = duration / 1000;
12,858✔
1495
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
12,858✔
1496
  }
1497

1498
_OVER:
15,274✔
1499
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
15,274✔
1500
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
3,742✔
1501
  }
1502

1503
  mndReleaseDnode(pMnode, pDnode);
15,274✔
1504
  mndReleaseMnode(pMnode, pMObj);
15,274✔
1505
  mndReleaseQnode(pMnode, pQObj);
15,274✔
1506
  mndReleaseBnode(pMnode, pBObj);
15,274✔
1507
  mndReleaseSnode(pMnode, pSObj);
15,274✔
1508
  tFreeSDropDnodeReq(&dropReq);
15,274✔
1509
  TAOS_RETURN(code);
15,274✔
1510
}
1511

1512
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1,418✔
1513
  int32_t code = 0;
1,418✔
1514
  SMnode *pMnode = pReq->info.node;
1,418✔
1515
  SSdb   *pSdb = pMnode->pSdb;
1,418✔
1516
  void   *pIter = NULL;
1,418✔
1517
  int8_t  encrypting = 0;
1,418✔
1518

1519
  const STraceId *trace = &pReq->info.traceId;
1,418✔
1520

1521
  int32_t klen = strlen(pDcfgReq->value);
1,418✔
1522
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1,418✔
1523
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1524
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1525
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1526
    goto _exit;
×
1527
  }
1528

1529
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1,418✔
1530
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1531
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1532
    goto _exit;
×
1533
  }
1534

1535
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1,418✔
1536
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1537
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1538
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1539
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1540
    goto _exit;
×
1541
  }
1542

1543
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1,418✔
1544
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1,418✔
1545
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1,418✔
1546

1547
  while (1) {
3,042✔
1548
    SDnodeObj *pDnode = NULL;
4,460✔
1549
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
4,460✔
1550
    if (pIter == NULL) break;
4,460✔
1551
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
3,042✔
1552
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1553
             offlineReason[pDnode->offlineReason]);
1554
      sdbRelease(pSdb, pDnode);
×
1555
      continue;
×
1556
    }
1557

1558
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
3,042✔
1559
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
3,042✔
1560
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
3,042✔
1561
      void   *pBuf = rpcMallocCont(bufLen);
3,042✔
1562

1563
      if (pBuf != NULL) {
3,042✔
1564
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
3,042✔
1565
          code = bufLen;
×
1566
          sdbRelease(pSdb, pDnode);
×
1567
          goto _exit;
×
1568
        }
1569
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
3,042✔
1570
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
3,042✔
1571
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
3,042✔
1572
        }
1573
      }
1574
    }
1575

1576
    sdbRelease(pSdb, pDnode);
3,042✔
1577
  }
1578

1579
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1,418✔
1580
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1581
  }
1582

1583
_exit:
1,418✔
1584
  if (code != 0) {
1,418✔
1585
    if (terrno == 0) terrno = code;
×
1586
  }
1587
  return code;
1,418✔
1588
}
1589

1590
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1,418✔
1591
  int32_t code = 0;
1,418✔
1592

1593
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1594
  SMnode       *pMnode = pReq->info.node;
1,418✔
1595
  SMCfgDnodeReq cfgReq = {0};
1,418✔
1596
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1,418✔
1597

1598
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE)) != 0) {
1,418✔
1599
    tFreeSMCfgDnodeReq(&cfgReq);
×
1600
    TAOS_RETURN(code);
×
1601
  }
1602
  const STraceId *trace = &pReq->info.traceId;
1,418✔
1603
  SDCfgDnodeReq   dcfgReq = {0};
1,418✔
1604
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1,418✔
1605
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
1,418✔
1606
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
1,418✔
1607
    tFreeSMCfgDnodeReq(&cfgReq);
1,418✔
1608
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1,418✔
1609
  } else {
1610
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1611
    tFreeSMCfgDnodeReq(&cfgReq);
×
1612
    TAOS_RETURN(code);
×
1613
  }
1614

1615
#else
1616
  TAOS_RETURN(code);
1617
#endif
1618
}
1619

1620
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
3,042✔
1621
  SMnode *pMnode = pRsp->info.node;
3,042✔
1622
  int16_t nSuccess = 0;
3,042✔
1623
  int16_t nFailed = 0;
3,042✔
1624

1625
  if (0 == pRsp->code) {
3,042✔
1626
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
3,042✔
1627
  } else {
1628
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1629
  }
1630

1631
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
3,042✔
1632
  bool    finished = nSuccess + nFailed >= nReq;
3,042✔
1633

1634
  if (finished) {
3,042✔
1635
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1,418✔
1636
  }
1637

1638
  const STraceId *trace = &pRsp->info.traceId;
3,042✔
1639
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
3,042✔
1640
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1641

1642
  return 0;
3,042✔
1643
}
1644

1645
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
2,506✔
1646
  SMnode *pMnode = pReq->info.node;
2,506✔
1647
  int32_t totalRows = 0;
2,506✔
1648
  int32_t numOfRows = 0;
2,506✔
1649
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
2,506✔
1650
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
2,506✔
1651
  char   *pWrite = NULL;
2,506✔
1652
  int32_t cols = 0;
2,506✔
1653
  int32_t code = 0;
2,506✔
1654
  int32_t lino = 0;
2,506✔
1655

1656
  cfgOpts[totalRows] = "statusIntervalMs";
2,506✔
1657
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
2,506✔
1658
  totalRows++;
2,506✔
1659

1660
  cfgOpts[totalRows] = "timezone";
2,506✔
1661
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
2,506✔
1662
  totalRows++;
2,506✔
1663

1664
  cfgOpts[totalRows] = "locale";
2,506✔
1665
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
2,506✔
1666
  totalRows++;
2,506✔
1667

1668
  cfgOpts[totalRows] = "charset";
2,506✔
1669
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
2,506✔
1670
  totalRows++;
2,506✔
1671

1672
  cfgOpts[totalRows] = "monitor";
2,506✔
1673
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
2,506✔
1674
  totalRows++;
2,506✔
1675

1676
  cfgOpts[totalRows] = "monitorInterval";
2,506✔
1677
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
2,506✔
1678
  totalRows++;
2,506✔
1679

1680
  cfgOpts[totalRows] = "slowLogThreshold";
2,506✔
1681
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
2,506✔
1682
  totalRows++;
2,506✔
1683

1684
  cfgOpts[totalRows] = "slowLogMaxLen";
2,506✔
1685
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
2,506✔
1686
  totalRows++;
2,506✔
1687

1688
  char scopeStr[64] = {0};
2,506✔
1689
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
2,506✔
1690
  cfgOpts[totalRows] = "slowLogScope";
2,506✔
1691
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
2,506✔
1692
  totalRows++;
2,506✔
1693

1694
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
2,506✔
1695
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
2,506✔
1696

1697
  for (int32_t i = 0; i < totalRows; i++) {
25,060✔
1698
    cols = 0;
22,554✔
1699

1700
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
22,554✔
1701
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,554✔
1702
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
22,554✔
1703

1704
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
22,554✔
1705
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,554✔
1706
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
22,554✔
1707

1708
    numOfRows++;
22,554✔
1709
  }
1710

1711
_OVER:
2,506✔
1712
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
2,506✔
1713
  pShow->numOfRows += numOfRows;
2,506✔
1714
  return numOfRows;
2,506✔
1715
}
1716

1717
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1718

1719
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
902,933✔
1720
  SMnode    *pMnode = pReq->info.node;
902,933✔
1721
  SSdb      *pSdb = pMnode->pSdb;
902,933✔
1722
  int32_t    numOfRows = 0;
902,933✔
1723
  int32_t    cols = 0;
902,933✔
1724
  ESdbStatus objStatus = 0;
902,933✔
1725
  SDnodeObj *pDnode = NULL;
902,933✔
1726
  int64_t    curMs = taosGetTimestampMs();
902,933✔
1727
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
902,719✔
1728
  int32_t    code = 0;
902,933✔
1729
  int32_t    lino = 0;
902,933✔
1730

1731
  while (numOfRows < rows) {
3,383,573✔
1732
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
3,383,573✔
1733
    if (pShow->pIter == NULL) break;
3,383,573✔
1734
    bool online = mndIsDnodeOnline(pDnode, curMs);
2,480,640✔
1735

1736
    cols = 0;
2,480,640✔
1737

1738
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,480,640✔
1739
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
2,480,640✔
1740

1741
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
2,480,640✔
1742

1743
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,480,640✔
1744
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,480,640✔
1745

1746
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,480,640✔
1747
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
2,480,640✔
1748
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
2,480,640✔
1749

1750
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,480,640✔
1751
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
2,480,640✔
1752
                        &lino, _OVER);
1753

1754
    const char *status = "ready";
2,480,640✔
1755
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
2,480,640✔
1756
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
2,480,640✔
1757
    if (!online) {
2,480,640✔
1758
      if (objStatus == SDB_STATUS_CREATING)
256,744✔
1759
        status = "creating*";
×
1760
      else if (objStatus == SDB_STATUS_DROPPING)
256,744✔
1761
        status = "dropping*";
×
1762
      else
1763
        status = "offline";
256,744✔
1764
    }
1765

1766
    STR_TO_VARSTR(buf, status);
2,480,640✔
1767
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,480,640✔
1768
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,480,640✔
1769

1770
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,480,640✔
1771
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
2,480,640✔
1772
                        _OVER);
1773

1774
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,480,640✔
1775
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
2,480,640✔
1776
                        _OVER);
1777

1778
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
2,480,640✔
1779
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
2,480,640✔
1780

1781
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,480,640✔
1782
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
2,480,640✔
1783
    taosMemoryFreeClear(b);
2,480,640✔
1784

1785
#ifdef TD_ENTERPRISE
1786
    STR_TO_VARSTR(buf, pDnode->machineId);
2,480,640✔
1787
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,480,640✔
1788
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,480,640✔
1789
#endif
1790

1791
    numOfRows++;
2,480,640✔
1792
    sdbRelease(pSdb, pDnode);
2,480,640✔
1793
  }
1794

1795
_OVER:
902,719✔
1796
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
902,933✔
1797

1798
  pShow->numOfRows += numOfRows;
902,933✔
1799
  return numOfRows;
902,933✔
1800
}
1801

1802
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1803
  SSdb *pSdb = pMnode->pSdb;
×
1804
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1805
}
×
1806

1807
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1808
  int32_t    code = 0;
×
1809
  SDnodeObj *pObj = NULL;
×
1810
  void      *pIter = NULL;
×
1811
  SSdb      *pSdb = pMnode->pSdb;
×
1812
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1813
  if (fqdns == NULL) {
×
1814
    mError("failed to init fqdns array");
×
1815
    return NULL;
×
1816
  }
1817

1818
  while (1) {
×
1819
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1820
    if (pIter == NULL) break;
×
1821

1822
    char *fqdn = taosStrdup(pObj->fqdn);
×
1823
    if (fqdn == NULL) {
×
1824
      sdbRelease(pSdb, pObj);
×
1825
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
1826

1827
      code = terrno;
×
1828
      break;
×
1829
    }
1830

1831
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1832
      mError("failed to fqdn into array, but continue at this time");
×
1833
    }
1834
    sdbRelease(pSdb, pObj);
×
1835
  }
1836

1837
_error:
×
1838
  if (code != 0) {
×
1839
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
1840
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
1841
      taosMemoryFreeClear(pFqdn);
×
1842
    }
1843
    taosArrayDestroy(fqdns);
×
1844
    fqdns = NULL;
×
1845
  }
1846

1847
  return fqdns;
×
1848
}
1849

1850
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
1851
  if (p == NULL) return NULL;
×
1852

1853
  switch (type) {
×
1854
    case SDB_DNODE:
×
1855
      return (SDnodeObj *)p;
×
1856
    case SDB_QNODE:
×
1857
      return ((SQnodeObj *)p)->pDnode;
×
1858
    case SDB_SNODE:
×
1859
      return ((SSnodeObj *)p)->pDnode;
×
1860
    case SDB_BNODE:
×
1861
      return ((SBnodeObj *)p)->pDnode;
×
1862
    default:
×
1863
      break;
×
1864
  }
1865
  return NULL;
×
1866
}
1867
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
1868
  int32_t lino = 0;
×
1869
  SSdb   *pSdb = pMnode->pSdb;
×
1870
  void   *pIter = NULL;
×
1871
  int32_t code = 0;
×
1872

1873
  while (1) {
×
1874
    void *pObj = NULL;
×
1875
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
1876
    if (pIter == NULL) break;
×
1877

1878
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
1879
    if (pDnodeObj == NULL) {
×
1880
      mError("null dnode object for type:%d", type);
×
1881
      sdbRelease(pSdb, pObj);
×
1882
      continue;
×
1883
    }
1884

1885
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
1886
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
1887
      mError("failed to push addr into array");
×
1888
      sdbRelease(pSdb, pObj);
×
1889
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1890
    }
1891
    sdbRelease(pSdb, pObj);
×
1892
  }
1893

1894
_exit:
×
1895
  return code;
×
1896
}
1897

1898
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
1899
  int32_t lino = 0;
×
1900
  int32_t code = 0;
×
1901
  if (pMnode == NULL || pAddr == NULL) {
×
1902
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
1903
  }
1904

1905
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
1906
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1907

1908
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
1909
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1910

1911
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
1912
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1913

1914
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
1915
  TAOS_CHECK_GOTO(code, &lino, _error);
×
1916

1917
_error:
×
1918
  return code;
×
1919
}
1920

1921
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
1922
  int32_t code = 0;
×
1923

1924
  SMnode *pMnode = pReq->info.node;
×
1925
  void   *pIter = NULL;
×
1926
  SSdb   *pSdb = pMnode->pSdb;
×
1927
  mInfo("start to reload dnode tls config");
×
1928

1929
  SMCfgDnodeReq req = {0};
×
1930
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
1931
    goto _OVER;
×
1932
  }
1933

1934
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
1935
    goto _OVER;
×
1936
  }
1937

1938
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
1939
  if (pAddr == NULL) {
×
1940
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
1941
  }
1942

1943
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
1944

1945
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
1946
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
1947
    // SEpSet epSet = mndCreateEpSetByStr(addr);
1948
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
1949
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
1950
    if (code != 0) {
×
1951
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
1952
    }
1953
  }
1954

1955
_OVER:
×
1956
  tFreeSMCfgDnodeReq(&req);
×
1957
  taosArrayDestroy(pAddr);
×
1958
  return code;
×
1959
}
1960

1961
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
1962
  int32_t code = 0;
×
1963
  if (pRsp->code != 0) {
×
1964
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
1965
  } else {
1966
    mInfo("succeed to reload dnode tls config");
×
1967
  }
1968
  return code;
×
1969
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc