• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3796

31 Mar 2025 10:39AM UTC coverage: 30.372% (-7.1%) from 37.443%
#3796

push

travis-ci

happyguoxy
test:add test cases

69287 of 309062 branches covered (22.42%)

Branch coverage included in aggregate %.

118044 of 307720 relevant lines covered (38.36%)

278592.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.6
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include <stdio.h>
18
#include "audit.h"
19
#include "mndCluster.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "taos_monitor.h"
31
#include "tconfig.h"
32
#include "tjson.h"
33
#include "tmisce.h"
34
#include "tunit.h"
35

36
#define TSDB_DNODE_VER_NUMBER   2
37
#define TSDB_DNODE_RESERVE_SIZE 40
38

39
static const char *offlineReason[] = {
40
    "",
41
    "status msg timeout",
42
    "status not received",
43
    "version not match",
44
    "dnodeId not match",
45
    "clusterId not match",
46
    "statusInterval not match",
47
    "timezone not match",
48
    "locale not match",
49
    "charset not match",
50
    "ttlChangeOnWrite not match",
51
    "enableWhiteList not match",
52
    "encryptionKey not match",
53
    "monitor not match",
54
    "monitor switch not match",
55
    "monitor interval not match",
56
    "monitor slow log threshold not match",
57
    "monitor slow log sql max len not match",
58
    "monitor slow log scope not match",
59
    "unknown",
60
};
61

62
enum {
63
  DND_ACTIVE_CODE,
64
  DND_CONN_ACTIVE_CODE,
65
};
66

67
enum {
68
  DND_CREATE,
69
  DND_ADD,
70
  DND_DROP,
71
};
72

73
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
74
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
75
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
76
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
77
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
78
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
79
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
80

81
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
82
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
84
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
85
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
86
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
87
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
88
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
89
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
90
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
91

92
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
93
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
94
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
96

97
#ifdef _GRANT
98
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
99
#else
100
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
101
#endif
102

103
int32_t mndInitDnode(SMnode *pMnode) {
9✔
104
  SSdbTable table = {
9✔
105
      .sdbType = SDB_DNODE,
106
      .keyType = SDB_KEY_INT32,
107
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
108
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
109
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
110
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
111
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
112
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
113
  };
114

115
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
9✔
116
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
9✔
117
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
9✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
9✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
9✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
9✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
9✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
9✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
9✔
124
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
9✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
9✔
126

127
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
9✔
128
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
9✔
129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
9✔
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
9✔
131

132
  return sdbSetTable(pMnode->pSdb, table);
9✔
133
}
134

135
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
136
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
137
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
138
void          mndCleanupDnode(SMnode *pMnode) {}
9✔
139

140
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
9✔
141
  int32_t  code = -1;
9✔
142
  SSdbRaw *pRaw = NULL;
9✔
143
  STrans  *pTrans = NULL;
9✔
144

145
  SDnodeObj dnodeObj = {0};
9✔
146
  dnodeObj.id = 1;
9✔
147
  dnodeObj.createdTime = taosGetTimestampMs();
9✔
148
  dnodeObj.updateTime = dnodeObj.createdTime;
9✔
149
  dnodeObj.port = tsServerPort;
9✔
150
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
9✔
151
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
9✔
152
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
9✔
153
  char *machineId = NULL;
9✔
154
  code = tGetMachineId(&machineId);
9✔
155
  if (machineId) {
9!
156
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
9✔
157
    taosMemoryFreeClear(machineId);
9!
158
  } else {
159
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
160
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
161
    goto _OVER;
×
162
#endif
163
  }
164

165
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
9✔
166
  if (pTrans == NULL) {
9!
167
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
168
    if (terrno != 0) code = terrno;
×
169
    goto _OVER;
×
170
  }
171
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
9!
172

173
  pRaw = mndDnodeActionEncode(&dnodeObj);
9✔
174
  if (pRaw == NULL) {
9!
175
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    goto _OVER;
×
178
  }
179
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
9!
180
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
9!
181
  pRaw = NULL;
9✔
182

183
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
9!
184
  code = 0;
9✔
185
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
9✔
186
                                   1);  // TODO: check the return value
187

188
_OVER:
9✔
189
  mndTransDrop(pTrans);
9✔
190
  sdbFreeRaw(pRaw);
9✔
191
  return code;
9✔
192
}
193

194
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
38✔
195
  int32_t code = 0;
38✔
196
  int32_t lino = 0;
38✔
197
  terrno = TSDB_CODE_OUT_OF_MEMORY;
38✔
198

199
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
38✔
200
  if (pRaw == NULL) goto _OVER;
38!
201

202
  int32_t dataPos = 0;
38✔
203
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
38!
204
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
38!
205
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
38!
206
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
38!
207
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
38!
208
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
38!
209
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
38!
210
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
38!
211
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
38!
212
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
38!
213

214
  terrno = 0;
38✔
215

216
_OVER:
38✔
217
  if (terrno != 0) {
38!
218
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
219
    sdbFreeRaw(pRaw);
×
220
    return NULL;
×
221
  }
222

223
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
38✔
224
  return pRaw;
38✔
225
}
226

227
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
20✔
228
  int32_t code = 0;
20✔
229
  int32_t lino = 0;
20✔
230
  terrno = TSDB_CODE_OUT_OF_MEMORY;
20✔
231
  SSdbRow   *pRow = NULL;
20✔
232
  SDnodeObj *pDnode = NULL;
20✔
233

234
  int8_t sver = 0;
20✔
235
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
20!
236
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
20!
237
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
238
    goto _OVER;
×
239
  }
240

241
  pRow = sdbAllocRow(sizeof(SDnodeObj));
20✔
242
  if (pRow == NULL) goto _OVER;
20!
243

244
  pDnode = sdbGetRowObj(pRow);
20✔
245
  if (pDnode == NULL) goto _OVER;
20!
246

247
  int32_t dataPos = 0;
20✔
248
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
20!
249
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
20!
250
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
20!
251
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
20!
252
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
20!
253
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
20!
254
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
20!
255
  if (sver > 1) {
20!
256
    int16_t keyLen = 0;
20✔
257
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
20!
258
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
20!
259
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
20!
260
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
20!
261
  }
262

263
  terrno = 0;
20✔
264
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
20!
265
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
266
  }
267

268
_OVER:
20✔
269
  if (terrno != 0) {
20!
270
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
271
    taosMemoryFreeClear(pRow);
×
272
    return NULL;
×
273
  }
274

275
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
20✔
276
  return pRow;
20✔
277
}
278

279
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
9✔
280
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
9✔
281
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
9✔
282

283
  char ep[TSDB_EP_LEN] = {0};
9✔
284
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
9✔
285
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
9✔
286
  return 0;
9✔
287
}
288

289
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
20✔
290
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
20✔
291
  return 0;
20✔
292
}
293

294
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
11✔
295
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
11✔
296
  pOld->updateTime = pNew->updateTime;
11✔
297
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
298
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
11✔
299
#endif
300
  return 0;
11✔
301
}
302

303
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
186✔
304
  SSdb      *pSdb = pMnode->pSdb;
186✔
305
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
186✔
306
  if (pDnode == NULL) {
186!
307
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
308
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
309
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
310
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
311
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
312
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
×
313
    } else {
314
      terrno = TSDB_CODE_APP_ERROR;
×
315
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
316
    }
317
  }
318

319
  return pDnode;
186✔
320
}
321

322
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
195✔
323
  SSdb *pSdb = pMnode->pSdb;
195✔
324
  sdbRelease(pSdb, pDnode);
195✔
325
}
195✔
326

327
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
58✔
328
  SEpSet epSet = {0};
58✔
329
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
58✔
330
  return epSet;
58✔
331
}
332

333
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
334
  SEpSet     epSet = {0};
×
335
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
336
  if (!pDnode) return epSet;
×
337

338
  epSet = mndGetDnodeEpset(pDnode);
×
339

340
  mndReleaseDnode(pMnode, pDnode);
×
341
  return epSet;
×
342
}
343

344
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
9✔
345
  SSdb *pSdb = pMnode->pSdb;
9✔
346

347
  void *pIter = NULL;
9✔
348
  while (1) {
×
349
    SDnodeObj *pDnode = NULL;
9✔
350
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
9✔
351
    if (pIter == NULL) break;
9!
352

353
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
9!
354
      sdbCancelFetch(pSdb, pIter);
9✔
355
      return pDnode;
9✔
356
    }
357

358
    sdbRelease(pSdb, pDnode);
×
359
  }
360

361
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
362
  return NULL;
×
363
}
364

365
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
×
366
  SSdb *pSdb = pMnode->pSdb;
×
367

368
  void *pIter = NULL;
×
369
  while (1) {
×
370
    SDnodeObj *pDnode = NULL;
×
371
    ESdbStatus objStatus = 0;
×
372
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
×
373
    if (pIter == NULL) break;
×
374

375
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
×
376
      sdbCancelFetch(pSdb, pIter);
×
377
      return pDnode;
×
378
    }
379

380
    sdbRelease(pSdb, pDnode);
×
381
  }
382

383
  return NULL;
×
384
}
385

386
int32_t mndGetDnodeSize(SMnode *pMnode) {
90✔
387
  SSdb *pSdb = pMnode->pSdb;
90✔
388
  return sdbGetSize(pSdb, SDB_DNODE);
90✔
389
}
390

391
int32_t mndGetDbSize(SMnode *pMnode) {
×
392
  SSdb *pSdb = pMnode->pSdb;
×
393
  return sdbGetSize(pSdb, SDB_DB);
×
394
}
395

396
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
39✔
397
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
39✔
398
  if (interval > 5000 * (int64_t)tsStatusInterval) {
39✔
399
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
10!
400
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
×
401
    }
402
    return false;
10✔
403
  }
404
  return true;
29✔
405
}
406

407
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
13✔
408
  SSdb *pSdb = pMnode->pSdb;
13✔
409

410
  int32_t numOfEps = 0;
13✔
411
  void   *pIter = NULL;
13✔
412
  while (1) {
13✔
413
    SDnodeObj *pDnode = NULL;
26✔
414
    ESdbStatus objStatus = 0;
26✔
415
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
26✔
416
    if (pIter == NULL) break;
26✔
417

418
    SDnodeEp dnodeEp = {0};
13✔
419
    dnodeEp.id = pDnode->id;
13✔
420
    dnodeEp.ep.port = pDnode->port;
13✔
421
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
13✔
422
    sdbRelease(pSdb, pDnode);
13✔
423

424
    dnodeEp.isMnode = 0;
13✔
425
    if (mndIsMnode(pMnode, pDnode->id)) {
13!
426
      dnodeEp.isMnode = 1;
13✔
427
    }
428
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
13!
429
      mError("failed to put ep into array, but continue at this call");
×
430
    }
431
  }
432
}
13✔
433

434
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
11✔
435
  SSdb   *pSdb = pMnode->pSdb;
11✔
436
  int32_t code = 0;
11✔
437

438
  int32_t numOfEps = 0;
11✔
439
  void   *pIter = NULL;
11✔
440
  while (1) {
11✔
441
    SDnodeObj *pDnode = NULL;
22✔
442
    ESdbStatus objStatus = 0;
22✔
443
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
22✔
444
    if (pIter == NULL) break;
22✔
445

446
    SDnodeInfo dInfo;
447
    dInfo.id = pDnode->id;
11✔
448
    dInfo.ep.port = pDnode->port;
11✔
449
    dInfo.offlineReason = pDnode->offlineReason;
11✔
450
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
11✔
451
    sdbRelease(pSdb, pDnode);
11✔
452
    if (mndIsMnode(pMnode, pDnode->id)) {
11!
453
      dInfo.isMnode = 1;
11✔
454
    } else {
455
      dInfo.isMnode = 0;
×
456
    }
457

458
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
11!
459
      code = terrno;
×
460
      sdbCancelFetch(pSdb, pIter);
×
461
      break;
×
462
    }
463
  }
464
  TAOS_RETURN(code);
11✔
465
}
466

467
#define CHECK_MONITOR_PARA(para, err)                                                                    \
468
  if (pCfg->monitorParas.para != para) {                                                                 \
469
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
470
    terrno = err;                                                                                        \
471
    return err;                                                                                          \
472
  }
473

474
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
13✔
475
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
13!
476
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
13!
477
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
13!
478
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
13!
479
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
13!
480

481
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
13!
482
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
483
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
484
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
485
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
486
  }
487

488
  if (pCfg->statusInterval != tsStatusInterval) {
13!
489
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
490
           tsStatusInterval);
491
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
492
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
493
  }
494

495
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
13!
496
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
497
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
498
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
499
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
500
  }
501

502
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
13!
503
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
504
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
505
    return DND_REASON_LOCALE_NOT_MATCH;
×
506
  }
507

508
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
13!
509
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
510
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
511
    return DND_REASON_CHARSET_NOT_MATCH;
×
512
  }
513

514
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
13!
515
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
516
           tsTtlChangeOnWrite);
517
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
518
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
519
  }
520
  int8_t enable = tsEnableWhiteList ? 1 : 0;
13✔
521
  if (pCfg->enableWhiteList != enable) {
13!
522
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
523
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
524
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
525
  }
526

527
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
13!
528
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
13!
529
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
530
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
531
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
532
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
533
  }
534

535
  return DND_REASON_ONLINE;
13✔
536
}
537

538
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
×
539
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
×
540
    return 0.0;
×
541
  }
542

543
  int64_t deltaCount = currentCount - lastCount;
×
544
  int64_t deltaMs = currentTimeMs - lastTimeMs;
×
545
  double  rate = (double)deltaCount / (double)deltaMs;
×
546
  return rate;
×
547
}
548

549
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
6✔
550
  bool stateChanged = false;
6✔
551
  bool roleChanged = pGid->syncState != pVload->syncState ||
18✔
552
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
6!
553
                     pGid->roleTimeMs != pVload->roleTimeMs;
×
554

555
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
6!
556
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
×
557
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
×
558
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
×
559
      int64_t currentTimeMs = taosGetTimestampMs();
×
560
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
×
561
                                          pGid->lastSyncAppliedIndexUpdateTime);
562

563
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
×
564
    }
565
  }
566

567
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
6✔
568
  pGid->syncCommitIndex = pVload->syncCommitIndex;
6✔
569
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
6!
570
      pGid->startTimeMs != pVload->startTimeMs) {
×
571
    mInfo(
6!
572
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
573
        "canRead:%d, dnode:%d",
574
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
575
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
576
    pGid->syncState = pVload->syncState;
6✔
577
    pGid->syncTerm = pVload->syncTerm;
6✔
578
    pGid->syncRestore = pVload->syncRestore;
6✔
579
    pGid->syncCanRead = pVload->syncCanRead;
6✔
580
    pGid->startTimeMs = pVload->startTimeMs;
6✔
581
    pGid->roleTimeMs = pVload->roleTimeMs;
6✔
582
    stateChanged = true;
6✔
583
  }
584
  return stateChanged;
6✔
585
}
586

587
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
22✔
588
  bool stateChanged = false;
22✔
589
  bool roleChanged = pObj->syncState != pMload->syncState ||
64✔
590
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
42!
591
                     pObj->roleTimeMs != pMload->roleTimeMs;
20!
592
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
22!
593
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
2!
594
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
595
          pObj->syncTerm, pMload->syncTerm);
596
    pObj->syncState = pMload->syncState;
2✔
597
    pObj->syncTerm = pMload->syncTerm;
2✔
598
    pObj->syncRestore = pMload->syncRestore;
2✔
599
    pObj->roleTimeMs = pMload->roleTimeMs;
2✔
600
    stateChanged = true;
2✔
601
  }
602
  return stateChanged;
22✔
603
}
604

605
extern char   *tsMonFwUri;
606
extern char   *tsMonSlowLogUri;
607
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
×
608
  SMnode    *pMnode = pReq->info.node;
×
609
  SStatisReq statisReq = {0};
×
610
  int32_t    code = -1;
×
611

612
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
×
613

614
  if (tsMonitorLogProtocol) {
×
615
    mInfo("process statis req,\n %s", statisReq.pCont);
×
616
  }
617

618
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
×
619
    monSendContent(statisReq.pCont, tsMonFwUri);
×
620
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
621
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
622
  }
623

624
  tFreeSStatisReq(&statisReq);
×
625
  return 0;
×
626
}
627

628
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
×
629
  mTrace("process audit req:%p", pReq);
×
630
  if (tsEnableAudit && tsEnableAuditDelete) {
×
631
    SMnode   *pMnode = pReq->info.node;
×
632
    SAuditReq auditReq = {0};
×
633

634
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
635

636
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
×
637

638
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
×
639
                   auditReq.sqlLen);
640

641
    tFreeSAuditReq(&auditReq);
×
642
  }
643
  return 0;
×
644
}
645

646
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
11✔
647
  int32_t       code = 0, lino = 0;
11✔
648
  SDnodeInfoReq infoReq = {0};
11✔
649
  int32_t       contLen = 0;
11✔
650
  void         *pReq = NULL;
11✔
651

652
  infoReq.dnodeId = pDnode->id;
11✔
653
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
11✔
654

655
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
11!
656
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
657
  }
658
  pReq = rpcMallocCont(contLen);
11✔
659
  if (pReq == NULL) {
11!
660
    TAOS_RETURN(terrno);
×
661
  }
662

663
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
11!
664
    code = contLen;
×
665
    goto _exit;
×
666
  }
667

668
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
11✔
669
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
11!
670
_exit:
11✔
671
  if (code < 0) {
11!
672
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
673
  }
674
  TAOS_RETURN(code);
11✔
675
}
676

677
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
11✔
678
  int32_t       code = 0, lino = 0;
11✔
679
  SMnode       *pMnode = pReq->info.node;
11✔
680
  SDnodeInfoReq infoReq = {0};
11✔
681
  SDnodeObj    *pDnode = NULL;
11✔
682
  STrans       *pTrans = NULL;
11✔
683
  SSdbRaw      *pCommitRaw = NULL;
11✔
684

685
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
11!
686

687
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
11✔
688
  if (pDnode == NULL) {
11!
689
    TAOS_CHECK_EXIT(terrno);
×
690
  }
691

692
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
11✔
693
  if (pTrans == NULL) {
11!
694
    TAOS_CHECK_EXIT(terrno);
×
695
  }
696

697
  pDnode->updateTime = taosGetTimestampMs();
11✔
698

699
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
11!
700
    TAOS_CHECK_EXIT(terrno);
×
701
  }
702
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
11!
703
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
704
    TAOS_CHECK_EXIT(code);
×
705
  }
706
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
11!
707
  pCommitRaw = NULL;
11✔
708

709
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
11!
710
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
711
    TAOS_CHECK_EXIT(code);
×
712
  }
713

714
_exit:
11✔
715
  mndReleaseDnode(pMnode, pDnode);
11✔
716
  if (code != 0) {
11!
717
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
718
  }
719
  mndTransDrop(pTrans);
11✔
720
  sdbFreeRaw(pCommitRaw);
11✔
721
  TAOS_RETURN(code);
11✔
722
}
723

724
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
22✔
725
  SMnode    *pMnode = pReq->info.node;
22✔
726
  SStatusReq statusReq = {0};
22✔
727
  SDnodeObj *pDnode = NULL;
22✔
728
  int32_t    code = -1;
22✔
729

730
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
22!
731

732
  int64_t clusterid = mndGetClusterId(pMnode);
22✔
733
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
22!
734
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
735
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
736
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
737
    goto _OVER;
×
738
  }
739

740
  if (statusReq.dnodeId == 0) {
22✔
741
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
9✔
742
    if (pDnode == NULL) {
9!
743
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
×
744
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
745
      if (terrno != 0) code = terrno;
×
746
      goto _OVER;
×
747
    }
748
  } else {
749
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
13✔
750
    if (pDnode == NULL) {
13!
751
      int32_t err = terrno;
×
752
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
×
753
      if (pDnode != NULL) {
×
754
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
×
755
        terrno = err;
×
756
        goto _OVER;
×
757
      }
758

759
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
×
760
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
×
761
        terrno = err;
×
762
        goto _OVER;
×
763
      } else {
764
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
×
765
        if (pDnode == NULL) goto _OVER;
×
766
      }
767
    }
768
  }
769

770
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
22✔
771

772
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
22✔
773
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
22✔
774
  int64_t curMs = taosGetTimestampMs();
22✔
775
  bool    online = mndIsDnodeOnline(pDnode, curMs);
22✔
776
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
22✔
777
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
22✔
778
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
22✔
779
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
22✔
780
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
22✔
781
  bool    analVerChanged = (analVer != statusReq.analVer);
22✔
782
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
13!
783
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
35!
784
  const STraceId *trace = &pReq->info.traceId;
22✔
785
  char            timestamp[TD_TIME_STR_LEN] = {0};
22✔
786
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
22✔
787
  mGTrace(
22!
788
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
789
      "timestamp:%s",
790
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
791

792
  if (reboot) {
22✔
793
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
2✔
794
  }
795

796
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
28✔
797
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
6✔
798

799
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
6✔
800
    if (pVgroup != NULL) {
6!
801
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
6!
802
        pVgroup->cacheUsage = pVload->cacheUsage;
6✔
803
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
6✔
804
        pVgroup->numOfTables = pVload->numOfTables;
6✔
805
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
6✔
806
        pVgroup->totalStorage = pVload->totalStorage;
6✔
807
        pVgroup->compStorage = pVload->compStorage;
6✔
808
        pVgroup->pointsWritten = pVload->pointsWritten;
6✔
809
      }
810
      bool stateChanged = false;
6✔
811
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
6!
812
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
6✔
813
        if (pGid->dnodeId == statusReq.dnodeId) {
6!
814
          if (pVload->startTimeMs == 0) {
6!
815
            pVload->startTimeMs = statusReq.rebootTime;
×
816
          }
817
          if (pVload->roleTimeMs == 0) {
6!
818
            pVload->roleTimeMs = statusReq.rebootTime;
×
819
          }
820
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
6✔
821
          break;
6✔
822
        }
823
      }
824
      if (stateChanged) {
6!
825
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
6✔
826
        if (pDb != NULL && pDb->stateTs != curMs) {
6!
827
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
1!
828
                pDb->stateTs, curMs);
829
          pDb->stateTs = curMs;
1✔
830
        }
831
        mndReleaseDb(pMnode, pDb);
6✔
832
      }
833
    }
834

835
    mndReleaseVgroup(pMnode, pVgroup);
6✔
836
  }
837

838
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
22✔
839
  if (pObj != NULL) {
22!
840
    if (statusReq.mload.roleTimeMs == 0) {
22✔
841
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
9✔
842
    }
843
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
22✔
844
    mndReleaseMnode(pMnode, pObj);
22✔
845
  }
846

847
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
22✔
848
  if (pQnode != NULL) {
22!
849
    pQnode->load = statusReq.qload;
×
850
    mndReleaseQnode(pMnode, pQnode);
×
851
  }
852

853
  if (needCheck) {
22✔
854
    if (statusReq.sver != tsVersion) {
13!
855
      if (pDnode != NULL) {
×
856
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
857
      }
858
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
859
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
860
      goto _OVER;
×
861
    }
862

863
    if (statusReq.dnodeId == 0) {
13✔
864
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
9!
865
    } else {
866
      if (statusReq.clusterId != pMnode->clusterId) {
4!
867
        if (pDnode != NULL) {
×
868
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
869
        }
870
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
871
               pMnode->clusterId);
872
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
873
        goto _OVER;
×
874
      }
875
    }
876

877
    // Verify whether the cluster parameters are consistent when status change from offline to ready
878
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
13✔
879
    if (pDnode->offlineReason != 0) {
13!
880
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
881
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
882
      goto _OVER;
×
883
    }
884

885
    if (!online) {
13✔
886
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
9!
887
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
888
    } else {
889
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
4!
890
            statusReq.dnodeVer, dnodeVer, reboot);
891
    }
892

893
    pDnode->rebootTime = statusReq.rebootTime;
13✔
894
    pDnode->numOfCores = statusReq.numOfCores;
13✔
895
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
13✔
896
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
13✔
897
    pDnode->memAvail = statusReq.memAvail;
13✔
898
    pDnode->memTotal = statusReq.memTotal;
13✔
899
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
13✔
900
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
13✔
901
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
13✔
902
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
11✔
903
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
11!
904
        goto _OVER;
×
905
      }
906
    }
907

908
    SStatusRsp statusRsp = {0};
13✔
909
    statusRsp.statusSeq++;
13✔
910
    statusRsp.analVer = analVer;
13✔
911
    statusRsp.dnodeVer = dnodeVer;
13✔
912
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
13✔
913
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
13✔
914
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
13✔
915
    if (statusRsp.pDnodeEps == NULL) {
13!
916
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
917
      goto _OVER;
×
918
    }
919

920
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
13✔
921
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
13✔
922

923
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
13✔
924
    void   *pHead = rpcMallocCont(contLen);
13✔
925
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
13✔
926
    taosArrayDestroy(statusRsp.pDnodeEps);
13✔
927
    if (contLen < 0) {
13!
928
      code = contLen;
×
929
      goto _OVER;
×
930
    }
931

932
    pReq->info.rspLen = contLen;
13✔
933
    pReq->info.rsp = pHead;
13✔
934
  }
935

936
  pDnode->accessTimes++;
22✔
937
  pDnode->lastAccessTime = curMs;
22✔
938
  code = 0;
22✔
939

940
_OVER:
22✔
941
  mndReleaseDnode(pMnode, pDnode);
22✔
942
  taosArrayDestroy(statusReq.pVloads);
22✔
943
  return mndUpdClusterInfo(pReq);
22✔
944
}
945

946
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
947
  SMnode    *pMnode = pReq->info.node;
×
948
  SNotifyReq notifyReq = {0};
×
949
  int32_t    code = 0;
×
950

951
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
952
    terrno = code;
×
953
    goto _OVER;
×
954
  }
955

956
  int64_t clusterid = mndGetClusterId(pMnode);
×
957
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
958
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
959
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
960
          notifyReq.clusterId, clusterid, tstrerror(code));
961
    goto _OVER;
×
962
  }
963

964
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
965
  for (int32_t v = 0; v < nVgroup; ++v) {
×
966
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
967

968
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
969
    if (pVgroup != NULL) {
×
970
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
971
      mndReleaseVgroup(pMnode, pVgroup);
×
972
    }
973
  }
974
  code = mndUpdClusterInfo(pReq);
×
975
_OVER:
×
976
  tFreeSNotifyReq(&notifyReq);
×
977
  return code;
×
978
}
979

980
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
×
981
  int32_t  code = -1;
×
982
  SSdbRaw *pRaw = NULL;
×
983
  STrans  *pTrans = NULL;
×
984

985
  SDnodeObj dnodeObj = {0};
×
986
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
×
987
  dnodeObj.createdTime = taosGetTimestampMs();
×
988
  dnodeObj.updateTime = dnodeObj.createdTime;
×
989
  dnodeObj.port = pCreate->port;
×
990
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
×
991
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
×
992

993
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
×
994
  if (pTrans == NULL) {
×
995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
996
    if (terrno != 0) code = terrno;
×
997
    goto _OVER;
×
998
  }
999
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
×
1000
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1001

1002
  pRaw = mndDnodeActionEncode(&dnodeObj);
×
1003
  if (pRaw == NULL) {
×
1004
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1005
    if (terrno != 0) code = terrno;
×
1006
    goto _OVER;
×
1007
  }
1008
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
×
1009
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
×
1010
  pRaw = NULL;
×
1011

1012
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1013
  code = 0;
×
1014

1015
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
×
1016
                                   1);  // TODO: check the return value
1017
_OVER:
×
1018
  mndTransDrop(pTrans);
×
1019
  sdbFreeRaw(pRaw);
×
1020
  return code;
×
1021
}
1022

1023
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
×
1024
  SMnode       *pMnode = pReq->info.node;
×
1025
  SSdb         *pSdb = pMnode->pSdb;
×
1026
  SDnodeObj    *pObj = NULL;
×
1027
  void         *pIter = NULL;
×
1028
  SDnodeListRsp rsp = {0};
×
1029
  int32_t       code = -1;
×
1030

1031
  rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
×
1032
  if (NULL == rsp.dnodeList) {
×
1033
    mError("failed to alloc epSet while process dnode list req");
×
1034
    code = terrno;
×
1035
    goto _OVER;
×
1036
  }
1037

1038
  while (1) {
×
1039
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1040
    if (pIter == NULL) break;
×
1041

1042
    SEpSet epSet = {0};
×
1043
    epSet.numOfEps = 1;
×
1044
    tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
×
1045
    epSet.eps[0].port = pObj->port;
×
1046

1047
    if (taosArrayPush(rsp.dnodeList, &epSet) == NULL) {
×
1048
      if (terrno != 0) code = terrno;
×
1049
      sdbRelease(pSdb, pObj);
×
1050
      sdbCancelFetch(pSdb, pIter);
×
1051
      goto _OVER;
×
1052
    }
1053

1054
    sdbRelease(pSdb, pObj);
×
1055
  }
1056

1057
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
×
1058
  void   *pRsp = rpcMallocCont(rspLen);
×
1059
  if (pRsp == NULL) {
×
1060
    code = terrno;
×
1061
    goto _OVER;
×
1062
  }
1063

1064
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
×
1065
    code = rspLen;
×
1066
    goto _OVER;
×
1067
  }
1068

1069
  pReq->info.rspLen = rspLen;
×
1070
  pReq->info.rsp = pRsp;
×
1071
  code = 0;
×
1072

1073
_OVER:
×
1074

1075
  if (code != 0) {
×
1076
    mError("failed to get dnode list since %s", tstrerror(code));
×
1077
  }
1078

1079
  tFreeSDnodeListRsp(&rsp);
×
1080

1081
  TAOS_RETURN(code);
×
1082
}
1083

1084
void getSlowLogScopeString(int32_t scope, char *result) {
×
1085
  if (scope == SLOW_LOG_TYPE_NULL) {
×
1086
    (void)strncat(result, "NONE", 64);
×
1087
    return;
×
1088
  }
1089
  while (scope > 0) {
×
1090
    if (scope & SLOW_LOG_TYPE_QUERY) {
×
1091
      (void)strncat(result, "QUERY", 64);
×
1092
      scope &= ~SLOW_LOG_TYPE_QUERY;
×
1093
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1094
      (void)strncat(result, "INSERT", 64);
×
1095
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1096
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1097
      (void)strncat(result, "OTHERS", 64);
×
1098
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1099
    } else {
1100
      (void)printf("invalid slow log scope:%d", scope);
×
1101
      return;
×
1102
    }
1103

1104
    if (scope > 0) {
×
1105
      (void)strncat(result, "|", 64);
×
1106
    }
1107
  }
1108
}
1109

1110
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
×
1111
  SMnode         *pMnode = pReq->info.node;
×
1112
  int32_t         code = -1;
×
1113
  SDnodeObj      *pDnode = NULL;
×
1114
  SCreateDnodeReq createReq = {0};
×
1115

1116
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
×
1117
    goto _OVER;
×
1118
  }
1119

1120
  TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
1121

1122
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
×
1123
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
×
1124

1125
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
×
1126
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1127
    goto _OVER;
×
1128
  }
1129

1130
  char ep[TSDB_EP_LEN];
1131
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
×
1132
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1133
  if (pDnode != NULL) {
×
1134
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1135
    goto _OVER;
×
1136
  }
1137

1138
  code = mndCreateDnode(pMnode, pReq, &createReq);
×
1139
  if (code == 0) {
×
1140
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1141
    tsGrantHBInterval = 5;
×
1142
  }
1143

1144
  char obj[200] = {0};
×
1145
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
×
1146

1147
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
×
1148

1149
_OVER:
×
1150
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1151
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1152
  }
1153

1154
  mndReleaseDnode(pMnode, pDnode);
×
1155
  tFreeSCreateDnodeReq(&createReq);
×
1156
  TAOS_RETURN(code);
×
1157
}
1158

1159
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1160

1161
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
×
1162

1163
#ifndef TD_ENTERPRISE
1164
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1165
#endif
1166

1167
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
×
1168
                            SSnodeObj *pSObj, int32_t numOfVnodes, bool force, bool unsafe) {
1169
  int32_t  code = -1;
×
1170
  SSdbRaw *pRaw = NULL;
×
1171
  STrans  *pTrans = NULL;
×
1172

1173
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
×
1174
  if (pTrans == NULL) {
×
1175
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1176
    if (terrno != 0) code = terrno;
×
1177
    goto _OVER;
×
1178
  }
1179
  mndTransSetSerial(pTrans);
×
1180
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
×
1181
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1182

1183
  pRaw = mndDnodeActionEncode(pDnode);
×
1184
  if (pRaw == NULL) {
×
1185
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1186
    if (terrno != 0) code = terrno;
×
1187
    goto _OVER;
×
1188
  }
1189
  TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRaw), NULL, _OVER);
×
1190
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), NULL, _OVER);
×
1191
  pRaw = NULL;
×
1192

1193
  pRaw = mndDnodeActionEncode(pDnode);
×
1194
  if (pRaw == NULL) {
×
1195
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1196
    if (terrno != 0) code = terrno;
×
1197
    goto _OVER;
×
1198
  }
1199
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
×
1200
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), NULL, _OVER);
×
1201
  pRaw = NULL;
×
1202

1203
  if (pMObj != NULL) {
×
1204
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
1205
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), NULL, _OVER);
×
1206
  }
1207

1208
  if (pQObj != NULL) {
×
1209
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
1210
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), NULL, _OVER);
×
1211
  }
1212

1213
  if (pSObj != NULL) {
×
1214
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
1215
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), NULL, _OVER);
×
1216
  }
1217

1218
  if (numOfVnodes > 0) {
×
1219
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
×
1220
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
×
1221
  }
1222

1223
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1224

1225
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP,
×
1226
                                   1);  // TODO: check the return value
1227
  code = 0;
×
1228

1229
_OVER:
×
1230
  mndTransDrop(pTrans);
×
1231
  sdbFreeRaw(pRaw);
×
1232
  TAOS_RETURN(code);
×
1233
}
1234

1235
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1236
  bool       isEmpty = false;
×
1237
  SMnodeObj *pMObj = NULL;
×
1238
  SQnodeObj *pQObj = NULL;
×
1239
  SSnodeObj *pSObj = NULL;
×
1240

1241
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1242
  if (pQObj) goto _OVER;
×
1243

1244
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1245
  if (pSObj) goto _OVER;
×
1246

1247
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1248
  if (pMObj) goto _OVER;
×
1249

1250
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1251
  if (numOfVnodes > 0) goto _OVER;
×
1252

1253
  isEmpty = true;
×
1254
_OVER:
×
1255
  mndReleaseMnode(pMnode, pMObj);
×
1256
  mndReleaseQnode(pMnode, pQObj);
×
1257
  mndReleaseSnode(pMnode, pSObj);
×
1258
  return isEmpty;
×
1259
}
1260

1261
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
×
1262
  SMnode       *pMnode = pReq->info.node;
×
1263
  int32_t       code = -1;
×
1264
  SDnodeObj    *pDnode = NULL;
×
1265
  SMnodeObj    *pMObj = NULL;
×
1266
  SQnodeObj    *pQObj = NULL;
×
1267
  SSnodeObj    *pSObj = NULL;
×
1268
  SDropDnodeReq dropReq = {0};
×
1269

1270
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
1271

1272
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
×
1273
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1274
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
×
1275

1276
  bool force = dropReq.force;
×
1277
  if (dropReq.unsafe) {
×
1278
    force = true;
×
1279
  }
1280

1281
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
×
1282
  if (pDnode == NULL) {
×
1283
    int32_t err = terrno;
×
1284
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1285
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1286
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1287
    if (pDnode == NULL) {
×
1288
      code = err;
×
1289
      goto _OVER;
×
1290
    }
1291
  }
1292

1293
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
×
1294
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
×
1295
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
×
1296
  if (pMObj != NULL) {
×
1297
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
×
1298
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
×
1299
      goto _OVER;
×
1300
    }
1301
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
×
1302
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
×
1303
      goto _OVER;
×
1304
    }
1305
  }
1306

1307
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
1308
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
1309

1310
  if (isonline && force) {
×
1311
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1312
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1313
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1314
    goto _OVER;
×
1315
  }
1316

1317
  bool isEmpty = mndIsEmptyDnode(pMnode, pDnode->id);
×
1318
  if (!isonline && !force && !isEmpty) {
×
1319
    code = TSDB_CODE_DNODE_OFFLINE;
×
1320
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1321
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1322
    goto _OVER;
×
1323
  }
1324

1325
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
×
1326
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1327

1328
  char obj1[30] = {0};
×
1329
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
×
1330

1331
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
×
1332

1333
_OVER:
×
1334
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1335
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1336
  }
1337

1338
  mndReleaseDnode(pMnode, pDnode);
×
1339
  mndReleaseMnode(pMnode, pMObj);
×
1340
  mndReleaseQnode(pMnode, pQObj);
×
1341
  mndReleaseSnode(pMnode, pSObj);
×
1342
  tFreeSDropDnodeReq(&dropReq);
×
1343
  TAOS_RETURN(code);
×
1344
}
1345

1346
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
×
1347
  int32_t code = 0;
×
1348
  SMnode *pMnode = pReq->info.node;
×
1349
  SSdb   *pSdb = pMnode->pSdb;
×
1350
  void   *pIter = NULL;
×
1351
  int8_t  encrypting = 0;
×
1352

1353
  const STraceId *trace = &pReq->info.traceId;
×
1354

1355
  int32_t klen = strlen(pDcfgReq->value);
×
1356
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
1357
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1358
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1359
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1360
    goto _exit;
×
1361
  }
1362

1363
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
×
1364
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1365
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1366
    goto _exit;
×
1367
  }
1368

1369
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
×
1370
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1371
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1372
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1373
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1374
    goto _exit;
×
1375
  }
1376

1377
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
×
1378
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
×
1379
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
×
1380

1381
  while (1) {
×
1382
    SDnodeObj *pDnode = NULL;
×
1383
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
1384
    if (pIter == NULL) break;
×
1385
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
1386
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1387
             offlineReason[pDnode->offlineReason]);
1388
      sdbRelease(pSdb, pDnode);
×
1389
      continue;
×
1390
    }
1391

1392
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
×
1393
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
1394
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
×
1395
      void   *pBuf = rpcMallocCont(bufLen);
×
1396

1397
      if (pBuf != NULL) {
×
1398
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
×
1399
          code = bufLen;
×
1400
          sdbRelease(pSdb, pDnode);
×
1401
          goto _exit;
×
1402
        }
1403
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
1404
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
×
1405
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
×
1406
        }
1407
      }
1408
    }
1409

1410
    sdbRelease(pSdb, pDnode);
×
1411
  }
1412

1413
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
×
1414
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1415
  }
1416

1417
_exit:
×
1418
  if (code != 0) {
×
1419
    if (terrno == 0) terrno = code;
×
1420
  }
1421
  return code;
×
1422
}
1423

1424
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
×
1425
  int32_t code = 0;
×
1426

1427
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1428
  SMnode       *pMnode = pReq->info.node;
×
1429
  SMCfgDnodeReq cfgReq = {0};
×
1430
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
×
1431

1432
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
×
1433
    tFreeSMCfgDnodeReq(&cfgReq);
×
1434
    TAOS_RETURN(code);
×
1435
  }
1436
  const STraceId *trace = &pReq->info.traceId;
×
1437
  SDCfgDnodeReq   dcfgReq = {0};
×
1438
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
×
1439
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
×
1440
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
×
1441
    tFreeSMCfgDnodeReq(&cfgReq);
×
1442
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
×
1443
  } else {
1444
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1445
    tFreeSMCfgDnodeReq(&cfgReq);
×
1446
    TAOS_RETURN(code);
×
1447
  }
1448

1449
#else
1450
  TAOS_RETURN(code);
1451
#endif
1452
}
1453

1454
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
×
1455
  SMnode *pMnode = pRsp->info.node;
×
1456
  int16_t nSuccess = 0;
×
1457
  int16_t nFailed = 0;
×
1458

1459
  if (0 == pRsp->code) {
×
1460
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
×
1461
  } else {
1462
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1463
  }
1464

1465
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
×
1466
  bool    finished = nSuccess + nFailed >= nReq;
×
1467

1468
  if (finished) {
×
1469
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1470
  }
1471

1472
  const STraceId *trace = &pRsp->info.traceId;
×
1473
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
×
1474
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1475

1476
  return 0;
×
1477
}
1478

1479
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1480
  SMnode *pMnode = pReq->info.node;
×
1481
  int32_t totalRows = 0;
×
1482
  int32_t numOfRows = 0;
×
1483
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
×
1484
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
×
1485
  char   *pWrite = NULL;
×
1486
  int32_t cols = 0;
×
1487
  int32_t code = 0;
×
1488
  int32_t lino = 0;
×
1489

1490
  cfgOpts[totalRows] = "statusInterval";
×
1491
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
×
1492
  totalRows++;
×
1493

1494
  cfgOpts[totalRows] = "timezone";
×
1495
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
×
1496
  totalRows++;
×
1497

1498
  cfgOpts[totalRows] = "locale";
×
1499
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
×
1500
  totalRows++;
×
1501

1502
  cfgOpts[totalRows] = "charset";
×
1503
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
×
1504
  totalRows++;
×
1505

1506
  cfgOpts[totalRows] = "monitor";
×
1507
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
×
1508
  totalRows++;
×
1509

1510
  cfgOpts[totalRows] = "monitorInterval";
×
1511
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
×
1512
  totalRows++;
×
1513

1514
  cfgOpts[totalRows] = "slowLogThreshold";
×
1515
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
×
1516
  totalRows++;
×
1517

1518
  cfgOpts[totalRows] = "slowLogMaxLen";
×
1519
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
×
1520
  totalRows++;
×
1521

1522
  char scopeStr[64] = {0};
×
1523
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
×
1524
  cfgOpts[totalRows] = "slowLogScope";
×
1525
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
×
1526
  totalRows++;
×
1527

1528
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
×
1529
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
×
1530

1531
  for (int32_t i = 0; i < totalRows; i++) {
×
1532
    cols = 0;
×
1533

1534
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
×
1535
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1536
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
×
1537

1538
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
×
1539
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1540
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
×
1541

1542
    numOfRows++;
×
1543
  }
1544

1545
_OVER:
×
1546
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
×
1547
  pShow->numOfRows += numOfRows;
×
1548
  return numOfRows;
×
1549
}
1550

1551
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1552

1553
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1554
  SMnode    *pMnode = pReq->info.node;
×
1555
  SSdb      *pSdb = pMnode->pSdb;
×
1556
  int32_t    numOfRows = 0;
×
1557
  int32_t    cols = 0;
×
1558
  ESdbStatus objStatus = 0;
×
1559
  SDnodeObj *pDnode = NULL;
×
1560
  int64_t    curMs = taosGetTimestampMs();
×
1561
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1562
  int32_t    code = 0;
×
1563
  int32_t    lino = 0;
×
1564

1565
  while (numOfRows < rows) {
×
1566
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
×
1567
    if (pShow->pIter == NULL) break;
×
1568
    bool online = mndIsDnodeOnline(pDnode, curMs);
×
1569

1570
    cols = 0;
×
1571

1572
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1573
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
×
1574

1575
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
×
1576

1577
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1578
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1579

1580
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1581
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
×
1582
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
×
1583

1584
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1585
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
×
1586
                        &lino, _OVER);
1587

1588
    const char *status = "ready";
×
1589
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
×
1590
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
×
1591
    if (!online) {
×
1592
      if (objStatus == SDB_STATUS_CREATING)
×
1593
        status = "creating*";
×
1594
      else if (objStatus == SDB_STATUS_DROPPING)
×
1595
        status = "dropping*";
×
1596
      else
1597
        status = "offline";
×
1598
    }
1599

1600
    STR_TO_VARSTR(buf, status);
×
1601
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1602
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1603

1604
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1605
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
×
1606
                        _OVER);
1607

1608
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1609
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
×
1610
                        _OVER);
1611

1612
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
×
1613
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
×
1614

1615
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1616
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
×
1617
    taosMemoryFreeClear(b);
×
1618

1619
#ifdef TD_ENTERPRISE
1620
    STR_TO_VARSTR(buf, pDnode->machineId);
×
1621
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1622
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1623
#endif
1624

1625
    numOfRows++;
×
1626
    sdbRelease(pSdb, pDnode);
×
1627
  }
1628

1629
_OVER:
×
1630
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
×
1631

1632
  pShow->numOfRows += numOfRows;
×
1633
  return numOfRows;
×
1634
}
1635

1636
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1637
  SSdb *pSdb = pMnode->pSdb;
×
1638
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1639
}
×
1640

1641
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
9✔
1642
  SDnodeObj *pObj = NULL;
9✔
1643
  void      *pIter = NULL;
9✔
1644
  SSdb      *pSdb = pMnode->pSdb;
9✔
1645
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
9✔
1646
  while (1) {
×
1647
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
9✔
1648
    if (pIter == NULL) break;
9!
1649

1650
    char *fqdn = taosStrdup(pObj->fqdn);
×
1651
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1652
      mError("failed to fqdn into array, but continue at this time");
×
1653
    }
1654
    sdbRelease(pSdb, pObj);
×
1655
  }
1656
  return fqdns;
9✔
1657
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc