• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.33
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndCluster.h"
21
#include "mndDb.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "taos_monitor.h"
31
#include "tjson.h"
32
#include "tmisce.h"
33
#include "tunit.h"
34

35
#define TSDB_DNODE_VER_NUMBER   2
36
#define TSDB_DNODE_RESERVE_SIZE 40
37

38
static const char *offlineReason[] = {
39
    "",
40
    "status msg timeout",
41
    "status not received",
42
    "version not match",
43
    "dnodeId not match",
44
    "clusterId not match",
45
    "statusInterval not match",
46
    "timezone not match",
47
    "locale not match",
48
    "charset not match",
49
    "ttlChangeOnWrite not match",
50
    "enableWhiteList not match",
51
    "encryptionKey not match",
52
    "monitor not match",
53
    "monitor switch not match",
54
    "monitor interval not match",
55
    "monitor slow log threshold not match",
56
    "monitor slow log sql max len not match",
57
    "monitor slow log scope not match",
58
    "unknown",
59
};
60

61
enum {
62
  DND_ACTIVE_CODE,
63
  DND_CONN_ACTIVE_CODE,
64
};
65

66
enum {
67
  DND_CREATE,
68
  DND_ADD,
69
  DND_DROP,
70
};
71

72
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
73
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
74
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
75
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
76
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
77
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
78
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
79
static int32_t  mndProcessShowVariablesReq(SRpcMsg *pReq);
80

81
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
82
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
91
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
92
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
93

94
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
96
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
98

99
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue);
100

101
#ifdef _GRANT
102
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
103
#else
104
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
105
#endif
106

107
int32_t mndInitDnode(SMnode *pMnode) {
716✔
108
  SSdbTable table = {
716✔
109
      .sdbType = SDB_DNODE,
110
      .keyType = SDB_KEY_INT32,
111
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
112
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
113
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
114
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
115
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
116
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
117
  };
118

119
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
716✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
716✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
716✔
122
  mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
716✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
716✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
716✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
716✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
716✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
716✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
716✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
716✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
716✔
131
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
716✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
716✔
133

134
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
716✔
135
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
716✔
136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
716✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
716✔
138

139
  return sdbSetTable(pMnode->pSdb, table);
716✔
140
}
141

142
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
143
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
144
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
145
void          mndCleanupDnode(SMnode *pMnode) {}
715✔
146

147
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
483✔
148
  int32_t  code = -1;
483✔
149
  SSdbRaw *pRaw = NULL;
483✔
150
  STrans  *pTrans = NULL;
483✔
151

152
  SDnodeObj dnodeObj = {0};
483✔
153
  dnodeObj.id = 1;
483✔
154
  dnodeObj.createdTime = taosGetTimestampMs();
483✔
155
  dnodeObj.updateTime = dnodeObj.createdTime;
483✔
156
  dnodeObj.port = tsServerPort;
483✔
157
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
483✔
158
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
483✔
159
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
483✔
160
  char *machineId = NULL;
483✔
161
  code = tGetMachineId(&machineId);
483✔
162
  if (machineId) {
483!
163
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
483✔
164
    taosMemoryFreeClear(machineId);
483!
165
  } else {
166
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
167
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
168
    goto _OVER;
×
169
#endif
170
  }
171

172
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
483✔
173
  if (pTrans == NULL) {
483!
174
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
175
    if (terrno != 0) code = terrno;
×
176
    goto _OVER;
×
177
  }
178
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
483!
179

180
  pRaw = mndDnodeActionEncode(&dnodeObj);
483✔
181
  if (pRaw == NULL) {
483!
182
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
183
    if (terrno != 0) code = terrno;
×
184
    goto _OVER;
×
185
  }
186
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
483!
187
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
483!
188
  pRaw = NULL;
483✔
189

190
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
483!
191
  code = 0;
483✔
192
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
483✔
193
                                   1);  // TODO: check the return value
194

195
_OVER:
483✔
196
  mndTransDrop(pTrans);
483✔
197
  sdbFreeRaw(pRaw);
483✔
198
  return code;
483✔
199
}
200

201
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,845✔
202
  int32_t code = 0;
2,845✔
203
  int32_t lino = 0;
2,845✔
204
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,845✔
205

206
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,845✔
207
  if (pRaw == NULL) goto _OVER;
2,845!
208

209
  int32_t dataPos = 0;
2,845✔
210
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,845!
211
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,845!
212
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,845!
213
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,845!
214
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,845!
215
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,845!
216
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,845!
217
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,845!
218
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,845!
219
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,845!
220

221
  terrno = 0;
2,845✔
222

223
_OVER:
2,845✔
224
  if (terrno != 0) {
2,845!
225
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
226
    sdbFreeRaw(pRaw);
×
227
    return NULL;
×
228
  }
229

230
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,845✔
231
  return pRaw;
2,845✔
232
}
233

234
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
1,905✔
235
  int32_t code = 0;
1,905✔
236
  int32_t lino = 0;
1,905✔
237
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,905✔
238
  SSdbRow   *pRow = NULL;
1,905✔
239
  SDnodeObj *pDnode = NULL;
1,905✔
240

241
  int8_t sver = 0;
1,905✔
242
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,905!
243
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
1,905!
244
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
245
    goto _OVER;
×
246
  }
247

248
  pRow = sdbAllocRow(sizeof(SDnodeObj));
1,905✔
249
  if (pRow == NULL) goto _OVER;
1,905!
250

251
  pDnode = sdbGetRowObj(pRow);
1,905✔
252
  if (pDnode == NULL) goto _OVER;
1,905!
253

254
  int32_t dataPos = 0;
1,905✔
255
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
1,905!
256
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
1,905!
257
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
1,905!
258
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
1,905!
259
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
1,905!
260
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
1,905!
261
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
1,905!
262
  if (sver > 1) {
1,905!
263
    int16_t keyLen = 0;
1,905✔
264
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,905!
265
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,905!
266
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,905!
267
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,905!
268
  }
269

270
  terrno = 0;
1,905✔
271
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
1,905!
272
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
273
  }
274

275
_OVER:
1,905✔
276
  if (terrno != 0) {
1,905!
277
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
278
    taosMemoryFreeClear(pRow);
×
279
    return NULL;
×
280
  }
281

282
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
1,905✔
283
  return pRow;
1,905✔
284
}
285

286
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
1,443✔
287
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
1,443✔
288
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
1,443✔
289

290
  char ep[TSDB_EP_LEN] = {0};
1,443✔
291
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
1,443✔
292
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
1,443✔
293
  return 0;
1,443✔
294
}
295

296
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
1,900✔
297
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
1,900✔
298
  return 0;
1,900✔
299
}
300

301
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
447✔
302
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
447!
303
  pOld->updateTime = pNew->updateTime;
447✔
304
#ifdef TD_ENTERPRISE
305
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
447✔
306
#endif
307
  return 0;
447✔
308
}
309

310
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
1,826,423✔
311
  SSdb      *pSdb = pMnode->pSdb;
1,826,423✔
312
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
1,826,423✔
313
  if (pDnode == NULL) {
1,826,421✔
314
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
314✔
315
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
18✔
316
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
296!
317
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
318
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
296!
319
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
296✔
320
    } else {
321
      terrno = TSDB_CODE_APP_ERROR;
×
322
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
323
    }
324
  }
325

326
  return pDnode;
1,826,404✔
327
}
328

329
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
1,827,409✔
330
  SSdb *pSdb = pMnode->pSdb;
1,827,409✔
331
  sdbRelease(pSdb, pDnode);
1,827,409✔
332
}
1,827,427✔
333

334
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
13,611✔
335
  SEpSet epSet = {0};
13,611✔
336
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
13,611✔
337
  return epSet;
13,611✔
338
}
339

340
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
341
  SEpSet     epSet = {0};
×
342
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
343
  if (!pDnode) return epSet;
×
344

345
  epSet = mndGetDnodeEpset(pDnode);
×
346

347
  mndReleaseDnode(pMnode, pDnode);
×
348
  return epSet;
×
349
}
350

351
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,205✔
352
  SSdb *pSdb = pMnode->pSdb;
1,205✔
353

354
  void *pIter = NULL;
1,205✔
355
  while (1) {
1,714✔
356
    SDnodeObj *pDnode = NULL;
2,919✔
357
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
2,919✔
358
    if (pIter == NULL) break;
2,919✔
359

360
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
2,425✔
361
      sdbCancelFetch(pSdb, pIter);
711✔
362
      return pDnode;
711✔
363
    }
364

365
    sdbRelease(pSdb, pDnode);
1,714✔
366
  }
367

368
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
494✔
369
  return NULL;
494✔
370
}
371

372
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
180✔
373
  SSdb *pSdb = pMnode->pSdb;
180✔
374

375
  void *pIter = NULL;
180✔
376
  while (1) {
180✔
377
    SDnodeObj *pDnode = NULL;
360✔
378
    ESdbStatus objStatus = 0;
360✔
379
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
360✔
380
    if (pIter == NULL) break;
360!
381

382
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
360✔
383
      sdbCancelFetch(pSdb, pIter);
180✔
384
      return pDnode;
180✔
385
    }
386

387
    sdbRelease(pSdb, pDnode);
180✔
388
  }
389

390
  return NULL;
×
391
}
392

393
int32_t mndGetDnodeSize(SMnode *pMnode) {
201,268✔
394
  SSdb *pSdb = pMnode->pSdb;
201,268✔
395
  return sdbGetSize(pSdb, SDB_DNODE);
201,268✔
396
}
397

398
int32_t mndGetDbSize(SMnode *pMnode) {
×
399
  SSdb *pSdb = pMnode->pSdb;
×
400
  return sdbGetSize(pSdb, SDB_DB);
×
401
}
402

403
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
229,866✔
404
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
229,866✔
405
  if (interval > 5000 * (int64_t)tsStatusInterval) {
229,866✔
406
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
3,064✔
407
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
39✔
408
    }
409
    return false;
3,064✔
410
  }
411
  return true;
226,802✔
412
}
413

414
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
1,753✔
415
  SSdb *pSdb = pMnode->pSdb;
1,753✔
416

417
  int32_t numOfEps = 0;
1,753✔
418
  void   *pIter = NULL;
1,753✔
419
  while (1) {
6,349✔
420
    SDnodeObj *pDnode = NULL;
8,102✔
421
    ESdbStatus objStatus = 0;
8,102✔
422
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
8,102✔
423
    if (pIter == NULL) break;
8,102✔
424

425
    SDnodeEp dnodeEp = {0};
6,349✔
426
    dnodeEp.id = pDnode->id;
6,349✔
427
    dnodeEp.ep.port = pDnode->port;
6,349✔
428
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
6,349✔
429
    sdbRelease(pSdb, pDnode);
6,349✔
430

431
    dnodeEp.isMnode = 0;
6,349✔
432
    if (mndIsMnode(pMnode, pDnode->id)) {
6,349✔
433
      dnodeEp.isMnode = 1;
2,514✔
434
    }
435
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
6,349!
436
      mError("failed to put ep into array, but continue at this call");
×
437
    }
438
  }
439
}
1,753✔
440

441
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
31,082✔
442
  SSdb   *pSdb = pMnode->pSdb;
31,082✔
443
  int32_t code = 0;
31,082✔
444

445
  int32_t numOfEps = 0;
31,082✔
446
  void   *pIter = NULL;
31,082✔
447
  while (1) {
124,866✔
448
    SDnodeObj *pDnode = NULL;
155,948✔
449
    ESdbStatus objStatus = 0;
155,948✔
450
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
155,948✔
451
    if (pIter == NULL) break;
155,969✔
452

453
    SDnodeInfo dInfo;
454
    dInfo.id = pDnode->id;
124,868✔
455
    dInfo.ep.port = pDnode->port;
124,868✔
456
    dInfo.offlineReason = pDnode->offlineReason;
124,868✔
457
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
124,868✔
458
    sdbRelease(pSdb, pDnode);
124,868✔
459
    if (mndIsMnode(pMnode, pDnode->id)) {
124,879✔
460
      dInfo.isMnode = 1;
35,192✔
461
    } else {
462
      dInfo.isMnode = 0;
89,686✔
463
    }
464

465
    if(taosArrayPush(pDnodeInfo, &dInfo) == NULL){
124,866!
466
      code = terrno;
×
467
      sdbCancelFetch(pSdb, pIter);
×
468
      break;
×
469
    }
470
  }
471
  TAOS_RETURN(code);
31,101✔
472
}
473

474
#define CHECK_MONITOR_PARA(para,err) \
475
if (pCfg->monitorParas.para != para) { \
476
  mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
477
  terrno = err; \
478
  return err;\
479
}
480

481
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
1,753✔
482
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
1,753!
483
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
1,753!
484
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
1,753!
485
  CHECK_MONITOR_PARA(tsSlowLogThresholdTest, DND_REASON_STATUS_MONITOR_NOT_MATCH);
1,753!
486
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
1,753!
487
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
1,753!
488

489
  if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
1,753!
490
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
×
491
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
492
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
493
  }
494

495
  if (pCfg->statusInterval != tsStatusInterval) {
1,753!
496
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
497
           tsStatusInterval);
498
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
499
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
500
  }
501

502
  if ((0 != strcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
1,753!
503
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
504
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
505
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
506
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
507
  }
508

509
  if (0 != strcasecmp(pCfg->locale, tsLocale)) {
1,753!
510
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
511
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
512
    return DND_REASON_LOCALE_NOT_MATCH;
×
513
  }
514

515
  if (0 != strcasecmp(pCfg->charset, tsCharset)) {
1,753!
516
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
517
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
518
    return DND_REASON_CHARSET_NOT_MATCH;
×
519
  }
520

521
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
1,753!
522
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
523
           tsTtlChangeOnWrite);
524
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
525
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
526
  }
527
  int8_t enable = tsEnableWhiteList ? 1 : 0;
1,753✔
528
  if (pCfg->enableWhiteList != enable) {
1,753!
529
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
530
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
531
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
532
  }
533

534
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
1,753!
535
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
1,753!
536
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
537
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
538
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
539
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
540
  }
541

542
  return DND_REASON_ONLINE;
1,753✔
543
}
544

545
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
3,013,798✔
546
  bool stateChanged = false;
3,013,798✔
547
  bool roleChanged = pGid->syncState != pVload->syncState ||
9,040,676✔
548
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
6,022,640!
549
                     pGid->roleTimeMs != pVload->roleTimeMs;
3,008,842✔
550
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
3,013,798!
551
      pGid->startTimeMs != pVload->startTimeMs) {
3,008,706✔
552
    mInfo(
5,093!
553
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
554
        "canRead:%d, dnode:%d",
555
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
556
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
557
    pGid->syncState = pVload->syncState;
5,094✔
558
    pGid->syncTerm = pVload->syncTerm;
5,094✔
559
    pGid->syncRestore = pVload->syncRestore;
5,094✔
560
    pGid->syncCanRead = pVload->syncCanRead;
5,094✔
561
    pGid->startTimeMs = pVload->startTimeMs;
5,094✔
562
    pGid->roleTimeMs = pVload->roleTimeMs;
5,094✔
563
    stateChanged = true;
5,094✔
564
  }
565
  return stateChanged;
3,013,799✔
566
}
567

568
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
43,834✔
569
  bool stateChanged = false;
43,834✔
570
  bool roleChanged = pObj->syncState != pMload->syncState ||
130,793✔
571
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
86,929!
572
                     pObj->roleTimeMs != pMload->roleTimeMs;
43,095✔
573
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
43,834✔
574
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
748!
575
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
576
          pObj->syncTerm, pMload->syncTerm);
577
    pObj->syncState = pMload->syncState;
748✔
578
    pObj->syncTerm = pMload->syncTerm;
748✔
579
    pObj->syncRestore = pMload->syncRestore;
748✔
580
    pObj->roleTimeMs = pMload->roleTimeMs;
748✔
581
    stateChanged = true;
748✔
582
  }
583
  return stateChanged;
43,834✔
584
}
585

586
extern char* tsMonFwUri;
587
extern char* tsMonSlowLogUri;
588
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
2,180✔
589
  SMnode    *pMnode = pReq->info.node;
2,180✔
590
  SStatisReq statisReq = {0};
2,180✔
591
  int32_t    code = -1;
2,180✔
592

593
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
2,180!
594

595
  if (tsMonitorLogProtocol) {
2,180!
596
    mInfo("process statis req,\n %s", statisReq.pCont);
×
597
  }
598

599
  if (statisReq.type == MONITOR_TYPE_COUNTER){
2,180✔
600
    monSendContent(statisReq.pCont, tsMonFwUri);
2,115✔
601
  }else if(statisReq.type == MONITOR_TYPE_SLOW_LOG){
65!
602
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
65✔
603
  }
604

605
  tFreeSStatisReq(&statisReq);
2,180✔
606
  return 0;
2,180✔
607
}
608

609
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
1,408✔
610
  mTrace("process audit req:%p", pReq);
1,408!
611
  if (tsEnableAudit && tsEnableAuditDelete) {
1,408!
612
    SMnode   *pMnode = pReq->info.node;
1,408✔
613
    SAuditReq auditReq = {0};
1,408✔
614

615
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
1,408!
616

617
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
1,408✔
618

619
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
1,408✔
620
                   auditReq.sqlLen);
621

622
    tFreeSAuditReq(&auditReq);
1,408✔
623
  }
624
  return 0;
1,408✔
625
}
626

627
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
228✔
628
  int32_t       code = 0, lino = 0;
228✔
629
  SDnodeInfoReq infoReq = {0};
228✔
630
  int32_t       contLen = 0;
228✔
631
  void         *pReq = NULL;
228✔
632

633
  infoReq.dnodeId = pDnode->id;
228✔
634
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
228✔
635

636
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
228!
637
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
638
  }
639
  pReq = rpcMallocCont(contLen);
228✔
640
  if (pReq == NULL) {
228!
641
    TAOS_RETURN(terrno);
×
642
  }
643

644
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
228!
645
    code = contLen;
×
646
    goto _exit;
×
647
  }
648

649
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
228✔
650
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
228!
651
_exit:
228✔
652
  if (code < 0) {
228!
653
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
654
  }
655
  TAOS_RETURN(code);
228✔
656
}
657

658
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
228✔
659
  int32_t       code = 0, lino = 0;
228✔
660
  SMnode       *pMnode = pReq->info.node;
228✔
661
  SDnodeInfoReq infoReq = {0};
228✔
662
  SDnodeObj    *pDnode = NULL;
228✔
663
  STrans       *pTrans = NULL;
228✔
664
  SSdbRaw      *pCommitRaw = NULL;
228✔
665

666
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
228!
667

668
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
228✔
669
  if (pDnode == NULL) {
228!
670
    TAOS_CHECK_EXIT(terrno);
×
671
  }
672

673
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
228✔
674
  if (pTrans == NULL) {
228!
675
    TAOS_CHECK_EXIT(terrno);
×
676
  }
677

678
  pDnode->updateTime = taosGetTimestampMs();
228✔
679

680
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
228!
681
    TAOS_CHECK_EXIT(terrno);
×
682
  }
683
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
228!
684
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
685
    TAOS_CHECK_EXIT(code);
×
686
  }
687
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
228!
688
  pCommitRaw = NULL;
228✔
689

690
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
228!
691
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
692
    TAOS_CHECK_EXIT(code);
×
693
  }
694

695
_exit:
228✔
696
  mndReleaseDnode(pMnode, pDnode);
228✔
697
  if (code != 0) {
228!
698
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
699
  }
700
  mndTransDrop(pTrans);
228✔
701
  sdbFreeRaw(pCommitRaw);
228✔
702
  TAOS_RETURN(code);
228✔
703
}
704

705
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
65,232✔
706
  SMnode    *pMnode = pReq->info.node;
65,232✔
707
  SStatusReq statusReq = {0};
65,232✔
708
  SDnodeObj *pDnode = NULL;
65,232✔
709
  int32_t    code = -1;
65,232✔
710

711
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
65,232!
712

713
  int64_t clusterid = mndGetClusterId(pMnode);
65,219✔
714
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
65,240!
715
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
716
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
717
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
718
    goto _OVER;
×
719
  }
720

721
  if (statusReq.dnodeId == 0) {
65,240✔
722
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
781✔
723
    if (pDnode == NULL) {
781✔
724
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
73!
725
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
73✔
726
      if (terrno != 0) code = terrno;
73!
727
      goto _OVER;
73✔
728
    }
729
  } else {
730
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
64,459✔
731
    if (pDnode == NULL) {
64,441✔
732
      int32_t err = terrno;
193✔
733
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
193✔
734
      if (pDnode != NULL) {
193✔
735
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
3✔
736
        terrno = err;
3✔
737
        goto _OVER;
3✔
738
      }
739

740
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
190!
741
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
190✔
742
        terrno = err;
10✔
743
        goto _OVER;
10✔
744
      } else {
745
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
180✔
746
        if (pDnode == NULL) goto _OVER;
180!
747
      }
748
    }
749
  }
750

751
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
65,136✔
752

753
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
65,151✔
754
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
65,140✔
755
  int64_t curMs = taosGetTimestampMs();
65,140✔
756
  bool    online = mndIsDnodeOnline(pDnode, curMs);
65,140✔
757
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
65,143✔
758
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
65,143✔
759
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
65,143✔
760
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
65,143✔
761
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
65,143✔
762
  bool    analVerChanged = (analVer != statusReq.analVer);
65,143✔
763
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
64,188!
764
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
129,331!
765
  const STraceId *trace = &pReq->info.traceId;
65,143✔
766
  mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
65,143!
767
          pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
768

769
  if (reboot) {
65,143✔
770
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
947✔
771
  }
772

773
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
3,081,436✔
774
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
3,016,293✔
775

776
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
3,016,293✔
777
    if (pVgroup != NULL) {
3,016,292✔
778
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
3,013,801!
779
        pVgroup->cacheUsage = pVload->cacheUsage;
3,008,794✔
780
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
3,008,794✔
781
        pVgroup->numOfTables = pVload->numOfTables;
3,008,794✔
782
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
3,008,794✔
783
        pVgroup->totalStorage = pVload->totalStorage;
3,008,794✔
784
        pVgroup->compStorage = pVload->compStorage;
3,008,794✔
785
        pVgroup->pointsWritten = pVload->pointsWritten;
3,008,794✔
786
      }
787
      bool stateChanged = false;
3,013,801✔
788
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
3,020,305✔
789
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
3,020,303✔
790
        if (pGid->dnodeId == statusReq.dnodeId) {
3,020,303✔
791
          if (pVload->startTimeMs == 0) {
3,013,799!
792
            pVload->startTimeMs = statusReq.rebootTime;
×
793
          }
794
          if (pVload->roleTimeMs == 0) {
3,013,799!
795
            pVload->roleTimeMs = statusReq.rebootTime;
×
796
          }
797
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
3,013,799✔
798
          break;
3,013,799✔
799
        }
800
      }
801
      if (stateChanged) {
3,013,801✔
802
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,094✔
803
        if (pDb != NULL && pDb->stateTs != curMs) {
5,094✔
804
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
2,095!
805
                pDb->stateTs, curMs);
806
          pDb->stateTs = curMs;
2,095✔
807
        }
808
        mndReleaseDb(pMnode, pDb);
5,094✔
809
      }
810
    }
811

812
    mndReleaseVgroup(pMnode, pVgroup);
3,016,292✔
813
  }
814

815
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
65,136✔
816
  if (pObj != NULL) {
65,134✔
817
    if (statusReq.mload.roleTimeMs == 0) {
43,834✔
818
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
3✔
819
    }
820
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
43,834✔
821
    mndReleaseMnode(pMnode, pObj);
43,834✔
822
  }
823

824
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
65,134✔
825
  if (pQnode != NULL) {
65,151✔
826
    pQnode->load = statusReq.qload;
34,388✔
827
    mndReleaseQnode(pMnode, pQnode);
34,388✔
828
  }
829

830
  if (needCheck) {
65,137✔
831
    if (statusReq.sver != tsVersion) {
1,753!
832
      if (pDnode != NULL) {
×
833
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
834
      }
835
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
836
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
837
      goto _OVER;
×
838
    }
839

840
    if (statusReq.dnodeId == 0) {
1,753✔
841
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
708!
842
    } else {
843
      if (statusReq.clusterId != pMnode->clusterId) {
1,045!
844
        if (pDnode != NULL) {
×
845
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
846
        }
847
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
848
               pMnode->clusterId);
849
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
850
        goto _OVER;
×
851
      }
852
    }
853

854
    // Verify whether the cluster parameters are consistent when status change from offline to ready
855
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
1,753✔
856
    if (pDnode->offlineReason != 0) {
1,753!
857
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
858
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
859
      goto _OVER;
×
860
    }
861

862
    if (!online) {
1,753✔
863
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
943!
864
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
865
    } else {
866
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
810!
867
            statusReq.dnodeVer, dnodeVer, reboot);
868
    }
869

870
    pDnode->rebootTime = statusReq.rebootTime;
1,753✔
871
    pDnode->numOfCores = statusReq.numOfCores;
1,753✔
872
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
1,753✔
873
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
1,753✔
874
    pDnode->memAvail = statusReq.memAvail;
1,753✔
875
    pDnode->memTotal = statusReq.memTotal;
1,753✔
876
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
1,753✔
877
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
1,753✔
878
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
1,753✔
879
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
228✔
880
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
228!
881
        goto _OVER;
×
882
      }
883
    }
884

885
    SStatusRsp statusRsp = {0};
1,753✔
886
    statusRsp.statusSeq++;
1,753✔
887
    statusRsp.analVer = analVer;
1,753✔
888
    statusRsp.dnodeVer = dnodeVer;
1,753✔
889
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
1,753✔
890
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
1,753✔
891
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
1,753✔
892
    if (statusRsp.pDnodeEps == NULL) {
1,753!
893
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
894
      goto _OVER;
×
895
    }
896

897
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
1,753✔
898
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
1,753✔
899

900
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
1,753✔
901
    void   *pHead = rpcMallocCont(contLen);
1,753✔
902
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
1,753✔
903
    taosArrayDestroy(statusRsp.pDnodeEps);
1,753✔
904
    if (contLen < 0) {
1,753!
905
      code = contLen;
×
906
      goto _OVER;
×
907
    }
908

909
    pReq->info.rspLen = contLen;
1,753✔
910
    pReq->info.rsp = pHead;
1,753✔
911
  }
912

913
  pDnode->accessTimes++;
65,137✔
914
  pDnode->lastAccessTime = curMs;
65,137✔
915
  code = 0;
65,137✔
916

917
_OVER:
65,223✔
918
  mndReleaseDnode(pMnode, pDnode);
65,223✔
919
  taosArrayDestroy(statusReq.pVloads);
65,241✔
920
  return mndUpdClusterInfo(pReq);
65,239✔
921
}
922

923
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
924
  SMnode    *pMnode = pReq->info.node;
×
925
  SNotifyReq notifyReq = {0};
×
926
  int32_t    code = 0;
×
927

928
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
929
    terrno = code;
×
930
    goto _OVER;
×
931
  }
932

933
  int64_t clusterid = mndGetClusterId(pMnode);
×
934
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
935
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
936
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
937
          notifyReq.clusterId, clusterid, tstrerror(code));
938
    goto _OVER;
×
939
  }
940

941
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
942
  for (int32_t v = 0; v < nVgroup; ++v) {
×
943
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
944

945
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
946
    if (pVgroup != NULL) {
×
947
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
948
      mndReleaseVgroup(pMnode, pVgroup);
×
949
    }
950
  }
951
  code = mndUpdClusterInfo(pReq);
×
952
_OVER:
×
953
  tFreeSNotifyReq(&notifyReq);
×
954
  return code;
×
955
}
956

957
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
230✔
958
  int32_t  code = -1;
230✔
959
  SSdbRaw *pRaw = NULL;
230✔
960
  STrans  *pTrans = NULL;
230✔
961

962
  SDnodeObj dnodeObj = {0};
230✔
963
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
230✔
964
  dnodeObj.createdTime = taosGetTimestampMs();
230✔
965
  dnodeObj.updateTime = dnodeObj.createdTime;
230✔
966
  dnodeObj.port = pCreate->port;
230✔
967
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
230✔
968
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
230✔
969

970
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
230✔
971
  if (pTrans == NULL) {
230!
972
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
973
    if (terrno != 0) code = terrno;
×
974
    goto _OVER;
×
975
  }
976
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
230!
977
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
230!
978

979
  pRaw = mndDnodeActionEncode(&dnodeObj);
230✔
980
  if (pRaw == NULL) {
230!
981
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
982
    if (terrno != 0) code = terrno;
×
983
    goto _OVER;
×
984
  }
985
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
230!
986
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
230!
987
  pRaw = NULL;
230✔
988

989
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
230!
990
  code = 0;
230✔
991

992
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
230✔
993
                                   1);  // TODO: check the return value
994
_OVER:
230✔
995
  mndTransDrop(pTrans);
230✔
996
  sdbFreeRaw(pRaw);
230✔
997
  return code;
230✔
998
}
999

1000
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
20✔
1001
  SMnode       *pMnode = pReq->info.node;
20✔
1002
  SSdb         *pSdb = pMnode->pSdb;
20✔
1003
  SDnodeObj    *pObj = NULL;
20✔
1004
  void         *pIter = NULL;
20✔
1005
  SDnodeListRsp rsp = {0};
20✔
1006
  int32_t       code = -1;
20✔
1007

1008
  rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
20✔
1009
  if (NULL == rsp.dnodeList) {
20!
1010
    mError("failed to alloc epSet while process dnode list req");
×
1011
    code = terrno;
×
1012
    goto _OVER;
×
1013
  }
1014

1015
  while (1) {
21✔
1016
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
41✔
1017
    if (pIter == NULL) break;
41✔
1018

1019
    SEpSet epSet = {0};
21✔
1020
    epSet.numOfEps = 1;
21✔
1021
    tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
21✔
1022
    epSet.eps[0].port = pObj->port;
21✔
1023

1024
    if (taosArrayPush(rsp.dnodeList, &epSet) == NULL) {
42!
1025
      if (terrno != 0) code = terrno;
×
1026
      sdbRelease(pSdb, pObj);
×
1027
      sdbCancelFetch(pSdb, pIter);
×
1028
      goto _OVER;
×
1029
    }
1030

1031
    sdbRelease(pSdb, pObj);
21✔
1032
  }
1033

1034
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
20✔
1035
  void   *pRsp = rpcMallocCont(rspLen);
20✔
1036
  if (pRsp == NULL) {
20!
1037
    code = terrno;
×
1038
    goto _OVER;
×
1039
  }
1040

1041
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
20!
1042
    code = rspLen;
×
1043
    goto _OVER;
×
1044
  }
1045

1046
  pReq->info.rspLen = rspLen;
20✔
1047
  pReq->info.rsp = pRsp;
20✔
1048
  code = 0;
20✔
1049

1050
_OVER:
20✔
1051

1052
  if (code != 0) {
20!
1053
    mError("failed to get dnode list since %s", tstrerror(code));
×
1054
  }
1055

1056
  tFreeSDnodeListRsp(&rsp);
20✔
1057

1058
  TAOS_RETURN(code);
20✔
1059
}
1060

1061
static void getSlowLogScopeString(int32_t scope, char* result){
15✔
1062
  if(scope == SLOW_LOG_TYPE_NULL) {
15!
1063
    (void)strcat(result, "NONE");
×
1064
    return;
×
1065
  }
1066
  while(scope > 0){
30✔
1067
    if(scope & SLOW_LOG_TYPE_QUERY) {
15!
1068
      (void)strcat(result, "QUERY");
15✔
1069
      scope &= ~SLOW_LOG_TYPE_QUERY;
15✔
1070
    } else if(scope & SLOW_LOG_TYPE_INSERT) {
×
1071
      (void)strcat(result, "INSERT");
×
1072
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1073
    } else if(scope & SLOW_LOG_TYPE_OTHERS) {
×
1074
      (void)strcat(result, "OTHERS");
×
1075
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1076
    } else{
1077
      (void)printf("invalid slow log scope:%d", scope);
×
1078
      return;
×
1079
    }
1080

1081
    if(scope > 0) {
15!
1082
      (void)strcat(result, "|");
×
1083
    }
1084
  }
1085
}
1086

1087
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
8✔
1088
  SShowVariablesRsp rsp = {0};
8✔
1089
  int32_t           code = -1;
8✔
1090

1091
  if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_SHOW_VARIABLES) != 0) {
8!
1092
    goto _OVER;
×
1093
  }
1094

1095
  rsp.variables = taosArrayInit(16, sizeof(SVariablesInfo));
8✔
1096
  if (NULL == rsp.variables) {
8!
1097
    mError("failed to alloc SVariablesInfo array while process show variables req");
×
1098
    code = terrno;
×
1099
    goto _OVER;
×
1100
  }
1101

1102
  SVariablesInfo info = {0};
8✔
1103

1104
  (void)strcpy(info.name, "statusInterval");
8✔
1105
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
8✔
1106
  (void)strcpy(info.scope, "server");
8✔
1107
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1108
    code = terrno;
×
1109
    goto _OVER;
×
1110
  }
1111

1112
  (void)strcpy(info.name, "timezone");
8✔
1113
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
8✔
1114
  (void)strcpy(info.scope, "both");
8✔
1115
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1116
    code = terrno;
×
1117
    goto _OVER;
×
1118
  }
1119

1120
  (void)strcpy(info.name, "locale");
8✔
1121
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
8✔
1122
  (void)strcpy(info.scope, "both");
8✔
1123
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1124
    code = terrno;
×
1125
    goto _OVER;
×
1126
  }
1127

1128
  (void)strcpy(info.name, "charset");
8✔
1129
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
8✔
1130
  (void)strcpy(info.scope, "both");
8✔
1131
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1132
    code = terrno;
×
1133
    goto _OVER;
×
1134
  }
1135

1136
  (void)strcpy(info.name, "monitor");
8✔
1137
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
8✔
1138
  (void)strcpy(info.scope, "server");
8✔
1139
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1140
    code = terrno;
×
1141
    goto _OVER;
×
1142
  }
1143

1144
  (void)strcpy(info.name, "monitorInterval");
8✔
1145
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
8✔
1146
  (void)strcpy(info.scope, "server");
8✔
1147
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1148
    code = terrno;
×
1149
    goto _OVER;
×
1150
  }
1151

1152
  (void)strcpy(info.name, "slowLogThreshold");
8✔
1153
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
8✔
1154
  (void)strcpy(info.scope, "server");
8✔
1155
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1156
    code = terrno;
×
1157
    goto _OVER;
×
1158
  }
1159

1160
  (void)strcpy(info.name, "slowLogMaxLen");
8✔
1161
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
8✔
1162
  (void)strcpy(info.scope, "server");
8✔
1163
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1164
    code = terrno;
×
1165
    goto _OVER;
×
1166
  }
1167

1168
  char scopeStr[64] = {0};
8✔
1169
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
8✔
1170
  (void)strcpy(info.name, "slowLogScope");
8✔
1171
  (void)snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
8✔
1172
  (void)strcpy(info.scope, "server");
8✔
1173
  if (taosArrayPush(rsp.variables, &info) == NULL) {
16!
1174
    code = terrno;
×
1175
    goto _OVER;
×
1176
  }
1177

1178
  int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp);
8✔
1179
  void   *pRsp = rpcMallocCont(rspLen);
8✔
1180
  if (pRsp == NULL) {
8!
1181
    code = terrno;
×
1182
    goto _OVER;
×
1183
  }
1184

1185
  if ((rspLen = tSerializeSShowVariablesRsp(pRsp, rspLen, &rsp)) <= 0) {
8!
1186
    code = rspLen;
×
1187
    goto _OVER;
×
1188
  }
1189

1190
  pReq->info.rspLen = rspLen;
8✔
1191
  pReq->info.rsp = pRsp;
8✔
1192
  code = 0;
8✔
1193

1194
_OVER:
8✔
1195

1196
  if (code != 0) {
8!
1197
    mError("failed to get show variables info since %s", tstrerror(code));
×
1198
  }
1199

1200
  tFreeSShowVariablesRsp(&rsp);
8✔
1201
  TAOS_RETURN(code);
8✔
1202
}
1203

1204
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
231✔
1205
  SMnode         *pMnode = pReq->info.node;
231✔
1206
  int32_t         code = -1;
231✔
1207
  SDnodeObj      *pDnode = NULL;
231✔
1208
  SCreateDnodeReq createReq = {0};
231✔
1209

1210
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
231!
1211
    goto _OVER;
×
1212
  }
1213

1214
  TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
231!
1215

1216
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
231!
1217
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
231✔
1218

1219
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
230!
1220
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1221
    goto _OVER;
×
1222
  }
1223

1224
  char ep[TSDB_EP_LEN];
1225
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
230✔
1226
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
230✔
1227
  if (pDnode != NULL) {
230!
1228
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1229
    goto _OVER;
×
1230
  }
1231

1232
  code = mndCreateDnode(pMnode, pReq, &createReq);
230✔
1233
  if (code == 0) {
230!
1234
    code = TSDB_CODE_ACTION_IN_PROGRESS;
230✔
1235
    tsGrantHBInterval = 5;
230✔
1236
  }
1237

1238
  char obj[200] = {0};
230✔
1239
  (void)sprintf(obj, "%s:%d", createReq.fqdn, createReq.port);
230✔
1240

1241
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
230✔
1242

1243
_OVER:
231✔
1244
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
231!
1245
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
1!
1246
  }
1247

1248
  mndReleaseDnode(pMnode, pDnode);
231✔
1249
  tFreeSCreateDnodeReq(&createReq);
231✔
1250
  TAOS_RETURN(code);
231✔
1251
}
1252

1253
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1254

1255
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
×
1256

1257
#ifndef TD_ENTERPRISE
1258
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1259
#endif
1260

1261
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
14✔
1262
                            SSnodeObj *pSObj, int32_t numOfVnodes, bool force, bool unsafe) {
1263
  int32_t  code = -1;
14✔
1264
  SSdbRaw *pRaw = NULL;
14✔
1265
  STrans  *pTrans = NULL;
14✔
1266

1267
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
14✔
1268
  if (pTrans == NULL) {
14!
1269
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1270
    if (terrno != 0) code = terrno;
×
1271
    goto _OVER;
×
1272
  }
1273
  mndTransSetSerial(pTrans);
14✔
1274
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
14!
1275
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
14!
1276

1277
  pRaw = mndDnodeActionEncode(pDnode);
14✔
1278
  if (pRaw == NULL) {
14!
1279
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1280
    if (terrno != 0) code = terrno;
×
1281
    goto _OVER;
×
1282
  }
1283
  TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRaw), NULL, _OVER);
14!
1284
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), NULL, _OVER);
14!
1285
  pRaw = NULL;
14✔
1286

1287
  pRaw = mndDnodeActionEncode(pDnode);
14✔
1288
  if (pRaw == NULL) {
14!
1289
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1290
    if (terrno != 0) code = terrno;
×
1291
    goto _OVER;
×
1292
  }
1293
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
14!
1294
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), NULL, _OVER);
14!
1295
  pRaw = NULL;
14✔
1296

1297
  if (pMObj != NULL) {
14✔
1298
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
2!
1299
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), NULL, _OVER);
2!
1300
  }
1301

1302
  if (pQObj != NULL) {
14!
UNCOV
1303
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
UNCOV
1304
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), NULL, _OVER);
×
1305
  }
1306

1307
  if (pSObj != NULL) {
14!
UNCOV
1308
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
UNCOV
1309
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), NULL, _OVER);
×
1310
  }
1311

1312
  if (numOfVnodes > 0) {
14✔
1313
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
9!
1314
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
9✔
1315
  }
1316

1317
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
13!
1318

1319
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP,
13✔
1320
                                   1);  // TODO: check the return value
1321
  code = 0;
13✔
1322

1323
_OVER:
14✔
1324
  mndTransDrop(pTrans);
14✔
1325
  sdbFreeRaw(pRaw);
14✔
1326
  TAOS_RETURN(code);
14✔
1327
}
1328

1329
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
15✔
1330
  bool       isEmpty = false;
15✔
1331
  SMnodeObj *pMObj = NULL;
15✔
1332
  SQnodeObj *pQObj = NULL;
15✔
1333
  SSnodeObj *pSObj = NULL;
15✔
1334

1335
  pQObj = mndAcquireQnode(pMnode, dnodeId);
15✔
1336
  if (pQObj) goto _OVER;
15!
1337

1338
  pSObj = mndAcquireSnode(pMnode, dnodeId);
15✔
1339
  if (pSObj) goto _OVER;
15!
1340

1341
  pMObj = mndAcquireMnode(pMnode, dnodeId);
15✔
1342
  if (pMObj) goto _OVER;
15✔
1343

1344
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
13✔
1345
  if (numOfVnodes > 0) goto _OVER;
13✔
1346

1347
  isEmpty = true;
3✔
1348
_OVER:
15✔
1349
  mndReleaseMnode(pMnode, pMObj);
15✔
1350
  mndReleaseQnode(pMnode, pQObj);
15✔
1351
  mndReleaseSnode(pMnode, pSObj);
15✔
1352
  return isEmpty;
15✔
1353
}
1354

1355
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
21✔
1356
  SMnode       *pMnode = pReq->info.node;
21✔
1357
  int32_t       code = -1;
21✔
1358
  SDnodeObj    *pDnode = NULL;
21✔
1359
  SMnodeObj    *pMObj = NULL;
21✔
1360
  SQnodeObj    *pQObj = NULL;
21✔
1361
  SSnodeObj    *pSObj = NULL;
21✔
1362
  SDropDnodeReq dropReq = {0};
21✔
1363

1364
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
21!
1365

1366
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
21!
1367
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1368
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
21✔
1369

1370
  bool force = dropReq.force;
20✔
1371
  if (dropReq.unsafe) {
20!
UNCOV
1372
    force = true;
×
1373
  }
1374

1375
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
20✔
1376
  if (pDnode == NULL) {
20✔
1377
    int32_t err = terrno;
1✔
1378
    char    ep[TSDB_EP_LEN + 1] = {0};
1✔
1379
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
1✔
1380
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
1✔
1381
    if (pDnode == NULL) {
1!
1382
      code = err;
1✔
1383
      goto _OVER;
1✔
1384
    }
1385
  }
1386

1387
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
19✔
1388
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
19✔
1389
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
19✔
1390
  if (pMObj != NULL) {
19✔
1391
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
6✔
1392
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
2✔
1393
      goto _OVER;
2✔
1394
    }
1395
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
4✔
1396
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
2✔
1397
      goto _OVER;
2✔
1398
    }
1399
  }
1400

1401
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
15✔
1402
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
15✔
1403

1404
  if (isonline && force) {
15!
1405
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1406
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1407
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1408
    goto _OVER;
×
1409
  }
1410

1411
  bool isEmpty = mndIsEmptyDnode(pMnode, pDnode->id);
15✔
1412
  if (!isonline && !force && !isEmpty) {
15!
1413
    code = TSDB_CODE_DNODE_OFFLINE;
1✔
1414
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
1!
1415
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1416
    goto _OVER;
1✔
1417
  }
1418

1419
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
14✔
1420
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
14✔
1421

1422
  char obj1[30] = {0};
14✔
1423
  (void)sprintf(obj1, "%d", dropReq.dnodeId);
14✔
1424

1425
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
14✔
1426

1427
_OVER:
21✔
1428
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
21!
1429
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
8!
1430
  }
1431

1432
  mndReleaseDnode(pMnode, pDnode);
21✔
1433
  mndReleaseMnode(pMnode, pMObj);
21✔
1434
  mndReleaseQnode(pMnode, pQObj);
21✔
1435
  mndReleaseSnode(pMnode, pSObj);
21✔
1436
  tFreeSDropDnodeReq(&dropReq);
21✔
1437
  TAOS_RETURN(code);
21✔
1438
}
1439

1440
static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
798✔
1441
  int32_t code = 0;
798✔
1442
  char *p = pMCfgReq->config;
798✔
1443
  while (*p) {
10,811✔
1444
    if (*p == ' ') {
10,042✔
1445
      break;
29✔
1446
    }
1447
    p++;
10,013✔
1448
  }
1449

1450
  size_t optLen = p - pMCfgReq->config;
798✔
1451
  (void)strncpy(pDCfgReq->config, pMCfgReq->config, optLen);
798✔
1452
  pDCfgReq->config[optLen] = 0;
798✔
1453

1454
  if (' ' == pMCfgReq->config[optLen]) {
798✔
1455
    // 'key value'
1456
    if (strlen(pMCfgReq->value) != 0) goto _err;
29!
1457
    (void)strcpy(pDCfgReq->value, p + 1);
29✔
1458
  } else {
1459
    // 'key' 'value'
1460
    if (strlen(pMCfgReq->value) == 0) goto _err;
769✔
1461
    (void)strcpy(pDCfgReq->value, pMCfgReq->value);
768✔
1462
  }
1463

1464
  TAOS_RETURN(code);
797✔
1465

1466
_err:
1✔
1467
  mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
1!
1468
  code = TSDB_CODE_INVALID_CFG;
1✔
1469
  TAOS_RETURN(code);
1✔
1470
}
1471

1472
static int32_t mndSendCfgDnodeReq(SMnode *pMnode, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
786✔
1473
  int32_t code = -1;
786✔
1474
  SSdb   *pSdb = pMnode->pSdb;
786✔
1475
  void   *pIter = NULL;
786✔
1476
  while (1) {
790✔
1477
    SDnodeObj *pDnode = NULL;
1,576✔
1478
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
1,576✔
1479
    if (pIter == NULL) break;
1,576✔
1480

1481
    if (pDnode->id == dnodeId || dnodeId == -1 || dnodeId == 0) {
790!
1482
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
782✔
1483
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
782✔
1484
      void   *pBuf = rpcMallocCont(bufLen);
782✔
1485

1486
      if (pBuf != NULL) {
782!
1487
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
782!
1488
          code = bufLen;
×
1489
          return code;
×
1490
        }
1491
        mInfo("dnode:%d, send config req to dnode, config:%s value:%s", dnodeId, pDcfgReq->config, pDcfgReq->value);
782!
1492
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
782✔
1493
        code = tmsgSendReq(&epSet, &rpcMsg);
782✔
1494
      }
1495
    }
1496

1497
    sdbRelease(pSdb, pDnode);
790✔
1498
  }
1499

1500
  if (code == -1) {
786✔
1501
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
4✔
1502
  }
1503
  TAOS_RETURN(code);
786✔
1504
}
1505

1506
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
806✔
1507
  int32_t       code = 0;
806✔
1508
  SMnode       *pMnode = pReq->info.node;
806✔
1509
  SMCfgDnodeReq cfgReq = {0};
806✔
1510
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
806!
1511
  int8_t updateIpWhiteList = 0;
806✔
1512
  mInfo("dnode:%d, start to config, option:%s, value:%s", cfgReq.dnodeId, cfgReq.config, cfgReq.value);
806!
1513
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
806✔
1514
    tFreeSMCfgDnodeReq(&cfgReq);
7✔
1515
    TAOS_RETURN(code);
7✔
1516
  }
1517

1518
  SDCfgDnodeReq dcfgReq = {0};
799✔
1519
  if (strcasecmp(cfgReq.config, "resetlog") == 0) {
799✔
1520
    (void)strcpy(dcfgReq.config, "resetlog");
1✔
1521
#ifdef TD_ENTERPRISE
1522
  } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
798!
1523
    int32_t optLen = strlen("s3blocksize");
×
1524
    int32_t flag = -1;
×
1525
    int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
×
1526
    if (code < 0) return code;
×
1527

1528
    if (flag > 1024 * 1024 || (flag > -1 && flag < 1024) || flag < -1) {
×
1529
      mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [1024, 1024 * 1024]",
×
1530
             cfgReq.dnodeId, flag);
1531
      code = TSDB_CODE_INVALID_CFG;
×
1532
      tFreeSMCfgDnodeReq(&cfgReq);
×
1533
      TAOS_RETURN(code);
×
1534
    }
1535

1536
    strcpy(dcfgReq.config, "s3blocksize");
×
1537
    snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
×
1538
#endif
1539
  } else {
1540
    TAOS_CHECK_GOTO (mndMCfg2DCfg(&cfgReq, &dcfgReq), NULL, _err_out);
798✔
1541
    if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) {
797!
1542
      mError("dnode:%d, failed to config since config is too long", cfgReq.dnodeId);
×
1543
      code = TSDB_CODE_INVALID_CFG;
×
1544
      goto _err_out;
×
1545
    }
1546
    if (strncasecmp(dcfgReq.config, "enableWhiteList", strlen("enableWhiteList")) == 0) {
797!
1547
      updateIpWhiteList = 1;
×
1548
    }
1549

1550
    TAOS_CHECK_GOTO(cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true), NULL, _err_out);
797✔
1551
  }
1552

1553
  {  // audit
1554
    char obj[50] = {0};
786✔
1555
    (void)sprintf(obj, "%d", cfgReq.dnodeId);
786✔
1556

1557
    auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen);
786✔
1558
  }
1559

1560
  tFreeSMCfgDnodeReq(&cfgReq);
786✔
1561

1562
  code = mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
786✔
1563

1564
  // dont care suss or succ;
1565
  if (updateIpWhiteList) mndRefreshUserIpWhiteList(pMnode);
786!
1566
  TAOS_RETURN(code);
786✔
1567

1568
_err_out:
13✔
1569
  tFreeSMCfgDnodeReq(&cfgReq);
13✔
1570
  TAOS_RETURN(code);
13✔
1571
}
1572

1573
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
782✔
1574
  mInfo("config rsp from dnode");
782!
1575
  return 0;
782✔
1576
}
1577

1578
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
×
1579
  int32_t code = 0;
×
1580
  SMnode *pMnode = pReq->info.node;
×
1581
  SSdb   *pSdb = pMnode->pSdb;
×
1582
  void   *pIter = NULL;
×
1583
  int8_t  encrypting = 0;
×
1584

1585
  const STraceId *trace = &pReq->info.traceId;
×
1586

1587
  int32_t klen = strlen(pDcfgReq->value);
×
1588
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
1589
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1590
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1591
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1592
    goto _exit;
×
1593
  }
1594

1595
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
×
1596
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1597
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1598
    goto _exit;
×
1599
  }
1600

1601
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
×
1602
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1603
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1604
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1605
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1606
    goto _exit;
×
1607
  }
1608

1609
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
×
1610
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
×
1611
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
×
1612

1613
  while (1) {
×
1614
    SDnodeObj *pDnode = NULL;
×
1615
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
1616
    if (pIter == NULL) break;
×
1617
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
1618
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1619
             offlineReason[pDnode->offlineReason]);
1620
      sdbRelease(pSdb, pDnode);
×
1621
      continue;
×
1622
    }
1623

1624
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
×
1625
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
1626
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
×
1627
      void   *pBuf = rpcMallocCont(bufLen);
×
1628

1629
      if (pBuf != NULL) {
×
1630
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
×
1631
          code = bufLen;
×
1632
          sdbRelease(pSdb, pDnode);
×
1633
          goto _exit;
×
1634
        }
1635
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
1636
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
×
1637
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
×
1638
        }
1639
      }
1640
    }
1641

1642
    sdbRelease(pSdb, pDnode);
×
1643
  }
1644

1645
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
×
1646
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1647
  }
1648

1649
_exit:
×
1650
  if (code != 0) {
×
1651
    if (terrno == 0) terrno = code;
×
1652
  }
1653
  return code;
×
1654
}
1655

1656
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
×
1657
  int32_t code = 0;
×
1658

1659
#ifdef TD_ENTERPRISE
1660
  SMnode       *pMnode = pReq->info.node;
×
1661
  SMCfgDnodeReq cfgReq = {0};
×
1662
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
×
1663

1664
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
×
1665
    tFreeSMCfgDnodeReq(&cfgReq);
×
1666
    TAOS_RETURN(code);
×
1667
  }
1668
  const STraceId *trace = &pReq->info.traceId;
×
1669
  SDCfgDnodeReq   dcfgReq = {0};
×
1670
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
×
1671
    strcpy(dcfgReq.config, cfgReq.config);
×
1672
    strcpy(dcfgReq.value, cfgReq.value);
×
1673
    tFreeSMCfgDnodeReq(&cfgReq);
×
1674
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
×
1675
  } else {
1676
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1677
    tFreeSMCfgDnodeReq(&cfgReq);
×
1678
    TAOS_RETURN(code);
×
1679
  }
1680

1681
#else
1682
  TAOS_RETURN(code);
1683
#endif
1684
}
1685

1686
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
×
1687
  SMnode *pMnode = pRsp->info.node;
×
1688
  int16_t nSuccess = 0;
×
1689
  int16_t nFailed = 0;
×
1690

1691
  if (0 == pRsp->code) {
×
1692
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
×
1693
  } else {
1694
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1695
  }
1696

1697
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
×
1698
  bool    finished = nSuccess + nFailed >= nReq;
×
1699

1700
  if (finished) {
×
1701
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1702
  }
1703

1704
  const STraceId *trace = &pRsp->info.traceId;
×
1705
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
×
1706
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1707

1708
  return 0;
×
1709
}
1710

1711
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7✔
1712
  SMnode *pMnode = pReq->info.node;
7✔
1713
  int32_t totalRows = 0;
7✔
1714
  int32_t numOfRows = 0;
7✔
1715
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
7✔
1716
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
7✔
1717
  char   *pWrite = NULL;
7✔
1718
  int32_t cols = 0;
7✔
1719
  int32_t code = 0;
7✔
1720
  int32_t lino = 0;
7✔
1721

1722
  cfgOpts[totalRows] = "statusInterval";
7✔
1723
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
7✔
1724
  totalRows++;
7✔
1725

1726
  cfgOpts[totalRows] = "timezone";
7✔
1727
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
7✔
1728
  totalRows++;
7✔
1729

1730
  cfgOpts[totalRows] = "locale";
7✔
1731
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
7✔
1732
  totalRows++;
7✔
1733

1734
  cfgOpts[totalRows] = "charset";
7✔
1735
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
7✔
1736
  totalRows++;
7✔
1737

1738
  cfgOpts[totalRows] = "monitor";
7✔
1739
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
7✔
1740
  totalRows++;
7✔
1741

1742
  cfgOpts[totalRows] = "monitorInterval";
7✔
1743
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
7✔
1744
  totalRows++;
7✔
1745

1746
  cfgOpts[totalRows] = "slowLogThreshold";
7✔
1747
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
7✔
1748
  totalRows++;
7✔
1749

1750
  cfgOpts[totalRows] = "slowLogMaxLen";
7✔
1751
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
7✔
1752
  totalRows++;
7✔
1753

1754
  char scopeStr[64] = {0};
7✔
1755
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
7✔
1756
  cfgOpts[totalRows] = "slowLogScope";
7✔
1757
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
7✔
1758
  totalRows++;
7✔
1759

1760
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1761
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1762

1763
  for (int32_t i = 0; i < totalRows; i++) {
70✔
1764
    cols = 0;
63✔
1765

1766
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
63✔
1767
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1768
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
63!
1769

1770
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
63✔
1771
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1772
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
63!
1773

1774
    numOfRows++;
63✔
1775
  }
1776

1777
_OVER:
7✔
1778
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
7!
1779
  pShow->numOfRows += numOfRows;
7✔
1780
  return numOfRows;
7✔
1781
}
1782

1783
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1784

1785
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
411✔
1786
  SMnode    *pMnode = pReq->info.node;
411✔
1787
  SSdb      *pSdb = pMnode->pSdb;
411✔
1788
  int32_t    numOfRows = 0;
411✔
1789
  int32_t    cols = 0;
411✔
1790
  ESdbStatus objStatus = 0;
411✔
1791
  SDnodeObj *pDnode = NULL;
411✔
1792
  int64_t    curMs = taosGetTimestampMs();
411✔
1793
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1794
  int32_t    code = 0;
411✔
1795
  int32_t    lino = 0;
411✔
1796

1797
  while (numOfRows < rows) {
1,849!
1798
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
1,849✔
1799
    if (pShow->pIter == NULL) break;
1,849✔
1800
    bool online = mndIsDnodeOnline(pDnode, curMs);
1,438✔
1801

1802
    cols = 0;
1,438✔
1803

1804
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,438✔
1805
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
1,438!
1806

1807
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
1,438✔
1808

1809
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,438✔
1810
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,438!
1811

1812
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,438✔
1813
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
1,438✔
1814
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
1,438!
1815

1816
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,438✔
1817
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
1,438!
1818
                        &lino, _OVER);
1819

1820
    const char *status = "ready";
1,438✔
1821
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
1,438!
1822
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
1,438!
1823
    if (!online) {
1,438✔
1824
      if (objStatus == SDB_STATUS_CREATING)
210!
1825
        status = "creating*";
×
1826
      else if (objStatus == SDB_STATUS_DROPPING)
210!
1827
        status = "dropping*";
×
1828
      else
1829
        status = "offline";
210✔
1830
    }
1831

1832
    STR_TO_VARSTR(buf, status);
1,438✔
1833
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,438✔
1834
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,438!
1835

1836
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,438✔
1837
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
1,438!
1838
                        _OVER);
1839

1840
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,438✔
1841
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
1,438!
1842
                        _OVER);
1843

1844
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
1,438✔
1845
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
1,438✔
1846

1847
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,438✔
1848
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
1,438!
1849
    taosMemoryFreeClear(b);
1,438!
1850

1851
#ifdef TD_ENTERPRISE
1852
    STR_TO_VARSTR(buf, pDnode->machineId);
1,438✔
1853
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,438✔
1854
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,438!
1855
#endif
1856

1857
    numOfRows++;
1,438✔
1858
    sdbRelease(pSdb, pDnode);
1,438✔
1859
  }
1860

1861
_OVER:
×
1862
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
411!
1863

1864
  pShow->numOfRows += numOfRows;
411✔
1865
  return numOfRows;
411✔
1866
}
1867

1868
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1869
  SSdb *pSdb = pMnode->pSdb;
×
1870
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1871
}
×
1872

1873
// get int32_t value from 'SMCfgDnodeReq'
1874
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32_t *pOutValue) {
×
1875
  int32_t code = 0;
×
1876
  if (' ' != pMCfgReq->config[optLen] && 0 != pMCfgReq->config[optLen]) {
×
1877
    goto _err;
×
1878
  }
1879

1880
  if (' ' == pMCfgReq->config[optLen]) {
×
1881
    // 'key value'
1882
    if (strlen(pMCfgReq->value) != 0) goto _err;
×
1883
    *pOutValue = atoi(pMCfgReq->config + optLen + 1);
×
1884
  } else {
1885
    // 'key' 'value'
1886
    if (strlen(pMCfgReq->value) == 0) goto _err;
×
1887
    *pOutValue = atoi(pMCfgReq->value);
×
1888
  }
1889

1890
  TAOS_RETURN(code);
×
1891

1892
_err:
×
1893
  mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
×
1894
  code = TSDB_CODE_INVALID_CFG;
×
1895
  TAOS_RETURN(code);
×
1896
}
1897

1898
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
911✔
1899
  SDnodeObj *pObj = NULL;
911✔
1900
  void      *pIter = NULL;
911✔
1901
  SSdb      *pSdb = pMnode->pSdb;
911✔
1902
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
911✔
1903
  while (1) {
896✔
1904
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
1,807✔
1905
    if (pIter == NULL) break;
1,807✔
1906

1907
    char *fqdn = taosStrdup(pObj->fqdn);
896✔
1908
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
896!
1909
      mError("failed to fqdn into array, but continue at this time");
×
1910
    }
1911
    sdbRelease(pSdb, pObj);
896✔
1912
  }
1913
  return fqdns;
911✔
1914
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc