• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.47
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "taos_monitor.h"
33
#include "tconfig.h"
34
#include "tjson.h"
35
#include "tmisce.h"
36
#include "tunit.h"
37

38
#define TSDB_DNODE_VER_NUMBER   2
39
#define TSDB_DNODE_RESERVE_SIZE 40
40

41
static const char *offlineReason[] = {
42
    "",
43
    "status msg timeout",
44
    "status not received",
45
    "version not match",
46
    "dnodeId not match",
47
    "clusterId not match",
48
    "statusInterval not match",
49
    "timezone not match",
50
    "locale not match",
51
    "charset not match",
52
    "ttlChangeOnWrite not match",
53
    "enableWhiteList not match",
54
    "encryptionKey not match",
55
    "monitor not match",
56
    "monitor switch not match",
57
    "monitor interval not match",
58
    "monitor slow log threshold not match",
59
    "monitor slow log sql max len not match",
60
    "monitor slow log scope not match",
61
    "unknown",
62
};
63

64
enum {
65
  DND_ACTIVE_CODE,
66
  DND_CONN_ACTIVE_CODE,
67
};
68

69
enum {
70
  DND_CREATE,
71
  DND_ADD,
72
  DND_DROP,
73
};
74

75
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
76
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
77
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
78
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
79
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
80
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
81
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
82

83
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
84
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
85
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
86
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
87
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
89
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
90
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
91
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
92
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
93

94
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
96
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
97
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
98

99
#ifdef _GRANT
100
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
101
#else
102
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
103
#endif
104

105
int32_t mndInitDnode(SMnode *pMnode) {
15✔
106
  SSdbTable table = {
15✔
107
      .sdbType = SDB_DNODE,
108
      .keyType = SDB_KEY_INT32,
109
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
110
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
111
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
112
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
113
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
114
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
115
  };
116

117
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
15✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
15✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
15✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
15✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
15✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
15✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
15✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
15✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
15✔
126
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
15✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
15✔
128

129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
15✔
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
15✔
131
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
15✔
132
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
15✔
133

134
  return sdbSetTable(pMnode->pSdb, table);
15✔
135
}
136

137
void mndCleanupDnode(SMnode *pMnode) {}
15✔
138

139
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
13✔
140
  int32_t  code = -1;
13✔
141
  SSdbRaw *pRaw = NULL;
13✔
142
  STrans  *pTrans = NULL;
13✔
143

144
  SDnodeObj dnodeObj = {0};
13✔
145
  dnodeObj.id = 1;
13✔
146
  dnodeObj.createdTime = taosGetTimestampMs();
13✔
147
  dnodeObj.updateTime = dnodeObj.createdTime;
13✔
148
  dnodeObj.port = tsServerPort;
13✔
149
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
13✔
150
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
13✔
151
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
13✔
152
  char *machineId = NULL;
13✔
153
  code = tGetMachineId(&machineId);
13✔
154
  if (machineId) {
13!
155
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
13✔
156
    taosMemoryFreeClear(machineId);
13!
157
  } else {
158
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
159
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
160
    goto _OVER;
×
161
#endif
162
  }
163

164
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
13✔
165
  if (pTrans == NULL) {
13!
166
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
167
    if (terrno != 0) code = terrno;
×
168
    goto _OVER;
×
169
  }
170
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
13!
171

172
  pRaw = mndDnodeActionEncode(&dnodeObj);
13✔
173
  if (pRaw == NULL) {
13!
174
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
175
    if (terrno != 0) code = terrno;
×
176
    goto _OVER;
×
177
  }
178
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
13!
179
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
13!
180
  pRaw = NULL;
13✔
181

182
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
13!
183
  code = 0;
13✔
184
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
13✔
185
                                   1);  // TODO: check the return value
186

187
_OVER:
13✔
188
  mndTransDrop(pTrans);
13✔
189
  sdbFreeRaw(pRaw);
13✔
190
  return code;
13✔
191
}
192

193
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
64✔
194
  int32_t code = 0;
64✔
195
  int32_t lino = 0;
64✔
196
  terrno = TSDB_CODE_OUT_OF_MEMORY;
64✔
197

198
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
64✔
199
  if (pRaw == NULL) goto _OVER;
64!
200

201
  int32_t dataPos = 0;
64✔
202
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
64!
203
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
64!
204
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
64!
205
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
64!
206
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
64!
207
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
64!
208
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
64!
209
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
64!
210
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
64!
211
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
64!
212

213
  terrno = 0;
64✔
214

215
_OVER:
64✔
216
  if (terrno != 0) {
64!
217
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
218
    sdbFreeRaw(pRaw);
×
219
    return NULL;
×
220
  }
221

222
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
64✔
223
  return pRaw;
64✔
224
}
225

226
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
38✔
227
  int32_t code = 0;
38✔
228
  int32_t lino = 0;
38✔
229
  terrno = TSDB_CODE_OUT_OF_MEMORY;
38✔
230
  SSdbRow   *pRow = NULL;
38✔
231
  SDnodeObj *pDnode = NULL;
38✔
232

233
  int8_t sver = 0;
38✔
234
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
38!
235
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
38!
236
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
237
    goto _OVER;
×
238
  }
239

240
  pRow = sdbAllocRow(sizeof(SDnodeObj));
38✔
241
  if (pRow == NULL) goto _OVER;
38!
242

243
  pDnode = sdbGetRowObj(pRow);
38✔
244
  if (pDnode == NULL) goto _OVER;
38!
245

246
  int32_t dataPos = 0;
38✔
247
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
38!
248
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
38!
249
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
38!
250
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
38!
251
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
38!
252
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
38!
253
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
38!
254
  if (sver > 1) {
38!
255
    int16_t keyLen = 0;
38✔
256
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
38!
257
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
38!
258
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
38!
259
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
38!
260
  }
261

262
  terrno = 0;
38✔
263
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
38!
264
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
265
  }
266

267
_OVER:
38✔
268
  if (terrno != 0) {
38!
269
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
270
    taosMemoryFreeClear(pRow);
×
271
    return NULL;
×
272
  }
273

274
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
38✔
275
  return pRow;
38✔
276
}
277

278
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
15✔
279
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
15✔
280
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
15✔
281

282
  char ep[TSDB_EP_LEN] = {0};
15✔
283
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
15✔
284
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
15✔
285
  return 0;
15✔
286
}
287

288
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
38✔
289
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
38✔
290
  return 0;
38✔
291
}
292

293
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
23✔
294
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
23✔
295
  pOld->updateTime = pNew->updateTime;
23✔
296
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
297
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
23✔
298
#endif
299
  return 0;
23✔
300
}
301

302
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
799✔
303
  SSdb      *pSdb = pMnode->pSdb;
799✔
304
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
799✔
305
  if (pDnode == NULL) {
799!
306
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
307
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
308
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
309
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
310
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
311
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
×
312
    } else {
313
      terrno = TSDB_CODE_APP_ERROR;
×
314
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
315
    }
316
  }
317

318
  return pDnode;
799✔
319
}
320

321
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
815✔
322
  SSdb *pSdb = pMnode->pSdb;
815✔
323
  sdbRelease(pSdb, pDnode);
815✔
324
}
815✔
325

326
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
96✔
327
  SEpSet epSet = {0};
96✔
328
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
96✔
329
  return epSet;
96✔
330
}
331

332
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
333
  SEpSet     epSet = {0};
×
334
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
335
  if (!pDnode) return epSet;
×
336

337
  epSet = mndGetDnodeEpset(pDnode);
×
338

339
  mndReleaseDnode(pMnode, pDnode);
×
340
  return epSet;
×
341
}
342

343
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
16✔
344
  SSdb *pSdb = pMnode->pSdb;
16✔
345

346
  void *pIter = NULL;
16✔
347
  while (1) {
×
348
    SDnodeObj *pDnode = NULL;
16✔
349
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
16✔
350
    if (pIter == NULL) break;
16!
351

352
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
16!
353
      sdbCancelFetch(pSdb, pIter);
16✔
354
      return pDnode;
16✔
355
    }
356

357
    sdbRelease(pSdb, pDnode);
×
358
  }
359

360
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
361
  return NULL;
×
362
}
363

364
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
×
365
  SSdb *pSdb = pMnode->pSdb;
×
366

367
  void *pIter = NULL;
×
368
  while (1) {
×
369
    SDnodeObj *pDnode = NULL;
×
370
    ESdbStatus objStatus = 0;
×
371
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
×
372
    if (pIter == NULL) break;
×
373

374
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
×
375
      sdbCancelFetch(pSdb, pIter);
×
376
      return pDnode;
×
377
    }
378

379
    sdbRelease(pSdb, pDnode);
×
380
  }
381

382
  return NULL;
×
383
}
384

385
int32_t mndGetDnodeSize(SMnode *pMnode) {
508✔
386
  SSdb *pSdb = pMnode->pSdb;
508✔
387
  return sdbGetSize(pSdb, SDB_DNODE);
508✔
388
}
389

390
int32_t mndGetDbSize(SMnode *pMnode) {
×
391
  SSdb *pSdb = pMnode->pSdb;
×
392
  return sdbGetSize(pSdb, SDB_DB);
×
393
}
394

395
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
338✔
396
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
338✔
397
  if (interval > 5000 * (int64_t)tsStatusInterval) {
338✔
398
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
30!
399
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
×
400
    }
401
    return false;
30✔
402
  }
403
  return true;
308✔
404
}
405

406
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
31✔
407
  SSdb *pSdb = pMnode->pSdb;
31✔
408

409
  int32_t numOfEps = 0;
31✔
410
  void   *pIter = NULL;
31✔
411
  while (1) {
31✔
412
    SDnodeObj *pDnode = NULL;
62✔
413
    ESdbStatus objStatus = 0;
62✔
414
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
62✔
415
    if (pIter == NULL) break;
62✔
416

417
    SDnodeEp dnodeEp = {0};
31✔
418
    dnodeEp.id = pDnode->id;
31✔
419
    dnodeEp.ep.port = pDnode->port;
31✔
420
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
31✔
421
    sdbRelease(pSdb, pDnode);
31✔
422

423
    dnodeEp.isMnode = 0;
31✔
424
    if (mndIsMnode(pMnode, pDnode->id)) {
31!
425
      dnodeEp.isMnode = 1;
31✔
426
    }
427
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
31!
428
      mError("failed to put ep into array, but continue at this call");
×
429
    }
430
  }
431
}
31✔
432

433
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
24✔
434
  SSdb   *pSdb = pMnode->pSdb;
24✔
435
  int32_t code = 0;
24✔
436

437
  int32_t numOfEps = 0;
24✔
438
  void   *pIter = NULL;
24✔
439
  while (1) {
24✔
440
    SDnodeObj *pDnode = NULL;
48✔
441
    ESdbStatus objStatus = 0;
48✔
442
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
48✔
443
    if (pIter == NULL) break;
48✔
444

445
    SDnodeInfo dInfo;
446
    dInfo.id = pDnode->id;
24✔
447
    dInfo.ep.port = pDnode->port;
24✔
448
    dInfo.offlineReason = pDnode->offlineReason;
24✔
449
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
24✔
450
    sdbRelease(pSdb, pDnode);
24✔
451
    if (mndIsMnode(pMnode, pDnode->id)) {
24!
452
      dInfo.isMnode = 1;
24✔
453
    } else {
454
      dInfo.isMnode = 0;
×
455
    }
456

457
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
24!
458
      code = terrno;
×
459
      sdbCancelFetch(pSdb, pIter);
×
460
      break;
×
461
    }
462
  }
463
  TAOS_RETURN(code);
24✔
464
}
465

466
#define CHECK_MONITOR_PARA(para, err)                                                                    \
467
  if (pCfg->monitorParas.para != para) {                                                                 \
468
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
469
    terrno = err;                                                                                        \
470
    return err;                                                                                          \
471
  }
472

473
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
31✔
474
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
31!
475
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
31!
476
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
31!
477
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
31!
478
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
31!
479

480
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
31!
481
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
482
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
483
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
484
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
485
  }
486

487
  if (pCfg->statusInterval != tsStatusInterval) {
31!
488
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
489
           tsStatusInterval);
490
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
491
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
492
  }
493

494
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
31!
495
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
496
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
497
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
498
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
499
  }
500

501
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
31!
502
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
503
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
504
    return DND_REASON_LOCALE_NOT_MATCH;
×
505
  }
506

507
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
31!
508
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
509
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
510
    return DND_REASON_CHARSET_NOT_MATCH;
×
511
  }
512

513
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
31!
514
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
515
           tsTtlChangeOnWrite);
516
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
517
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
518
  }
519
  int8_t enable = tsEnableWhiteList ? 1 : 0;
31✔
520
  if (pCfg->enableWhiteList != enable) {
31!
521
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
522
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
523
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
524
  }
525

526
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
31!
527
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
31!
528
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
529
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
530
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
531
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
532
  }
533

534
  return DND_REASON_ONLINE;
31✔
535
}
536

537
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
×
538
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
×
539
    return 0.0;
×
540
  }
541

542
  int64_t deltaCount = currentCount - lastCount;
×
543
  int64_t deltaMs = currentTimeMs - lastTimeMs;
×
544
  double  rate = (double)deltaCount / (double)deltaMs;
×
545
  return rate;
×
546
}
547

548
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
859✔
549
  bool stateChanged = false;
859✔
550
  bool roleChanged = pGid->syncState != pVload->syncState ||
2,577✔
551
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
1,685!
552
                     pGid->roleTimeMs != pVload->roleTimeMs;
826!
553

554
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
859✔
555
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
1!
556
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
1✔
557
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
×
558
      int64_t currentTimeMs = taosGetTimestampMs();
×
559
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
×
560
                                          pGid->lastSyncAppliedIndexUpdateTime);
561

562
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
×
563
    }
564
  }
565

566
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
859✔
567
  pGid->syncCommitIndex = pVload->syncCommitIndex;
859✔
568
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
859✔
569
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
859✔
570
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
859!
571
      pGid->startTimeMs != pVload->startTimeMs) {
826!
572
    mInfo(
33!
573
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
574
        "canRead:%d, dnode:%d",
575
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
576
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
577
    pGid->syncState = pVload->syncState;
33✔
578
    pGid->syncTerm = pVload->syncTerm;
33✔
579
    pGid->syncRestore = pVload->syncRestore;
33✔
580
    pGid->syncCanRead = pVload->syncCanRead;
33✔
581
    pGid->startTimeMs = pVload->startTimeMs;
33✔
582
    pGid->roleTimeMs = pVload->roleTimeMs;
33✔
583
    stateChanged = true;
33✔
584
  }
585
  return stateChanged;
859✔
586
}
587

588
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
161✔
589
  bool stateChanged = false;
161✔
590
  bool roleChanged = pObj->syncState != pMload->syncState ||
475✔
591
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
314!
592
                     pObj->roleTimeMs != pMload->roleTimeMs;
153!
593
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
161!
594
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
8!
595
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
596
          pObj->syncTerm, pMload->syncTerm);
597
    pObj->syncState = pMload->syncState;
8✔
598
    pObj->syncTerm = pMload->syncTerm;
8✔
599
    pObj->syncRestore = pMload->syncRestore;
8✔
600
    pObj->roleTimeMs = pMload->roleTimeMs;
8✔
601
    stateChanged = true;
8✔
602
  }
603
  return stateChanged;
161✔
604
}
605

606
extern char   *tsMonFwUri;
607
extern char   *tsMonSlowLogUri;
608
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
4✔
609
  SMnode    *pMnode = pReq->info.node;
4✔
610
  SStatisReq statisReq = {0};
4✔
611
  int32_t    code = -1;
4✔
612

613
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
4!
614

615
  if (tsMonitorLogProtocol) {
4!
616
    mInfo("process statis req,\n %s", statisReq.pCont);
×
617
  }
618

619
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
4!
620
    monSendContent(statisReq.pCont, tsMonFwUri);
4✔
621
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
622
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
623
  }
624

625
  tFreeSStatisReq(&statisReq);
4✔
626
  return 0;
4✔
627
}
628

629
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
×
630
  mTrace("process audit req:%p", pReq);
×
631
  if (tsEnableAudit && tsEnableAuditDelete) {
×
632
    SMnode   *pMnode = pReq->info.node;
×
633
    SAuditReq auditReq = {0};
×
634

635
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
636

637
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
×
638

639
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
×
640
                   auditReq.sqlLen);
641

642
    tFreeSAuditReq(&auditReq);
×
643
  }
644
  return 0;
×
645
}
646

647
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
23✔
648
  int32_t       code = 0, lino = 0;
23✔
649
  SDnodeInfoReq infoReq = {0};
23✔
650
  int32_t       contLen = 0;
23✔
651
  void         *pReq = NULL;
23✔
652

653
  infoReq.dnodeId = pDnode->id;
23✔
654
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
23✔
655

656
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
23!
657
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
658
  }
659
  pReq = rpcMallocCont(contLen);
23✔
660
  if (pReq == NULL) {
23!
661
    TAOS_RETURN(terrno);
×
662
  }
663

664
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
23!
665
    code = contLen;
×
666
    goto _exit;
×
667
  }
668

669
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
23✔
670
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
23!
671
_exit:
23✔
672
  if (code < 0) {
23!
673
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
674
  }
675
  TAOS_RETURN(code);
23✔
676
}
677

678
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
23✔
679
  int32_t       code = 0, lino = 0;
23✔
680
  SMnode       *pMnode = pReq->info.node;
23✔
681
  SDnodeInfoReq infoReq = {0};
23✔
682
  SDnodeObj    *pDnode = NULL;
23✔
683
  STrans       *pTrans = NULL;
23✔
684
  SSdbRaw      *pCommitRaw = NULL;
23✔
685

686
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
23!
687

688
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
23✔
689
  if (pDnode == NULL) {
23!
690
    TAOS_CHECK_EXIT(terrno);
×
691
  }
692

693
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
23✔
694
  if (pTrans == NULL) {
23!
695
    TAOS_CHECK_EXIT(terrno);
×
696
  }
697

698
  pDnode->updateTime = taosGetTimestampMs();
23✔
699

700
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
23!
701
    TAOS_CHECK_EXIT(terrno);
×
702
  }
703
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
23!
704
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
705
    TAOS_CHECK_EXIT(code);
×
706
  }
707
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
23!
708
  pCommitRaw = NULL;
23✔
709

710
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
23!
711
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
712
    TAOS_CHECK_EXIT(code);
×
713
  }
714

715
_exit:
23✔
716
  mndReleaseDnode(pMnode, pDnode);
23✔
717
  if (code != 0) {
23!
718
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
719
  }
720
  mndTransDrop(pTrans);
23✔
721
  sdbFreeRaw(pCommitRaw);
23✔
722
  TAOS_RETURN(code);
23✔
723
}
724

725
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
161✔
726
  SMnode    *pMnode = pReq->info.node;
161✔
727
  SStatusReq statusReq = {0};
161✔
728
  SDnodeObj *pDnode = NULL;
161✔
729
  int32_t    code = -1;
161✔
730

731
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
161!
732

733
  int64_t clusterid = mndGetClusterId(pMnode);
161✔
734
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
161!
735
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
736
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
737
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
738
    goto _OVER;
×
739
  }
740

741
  if (statusReq.dnodeId == 0) {
161✔
742
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
16✔
743
    if (pDnode == NULL) {
16!
744
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
×
745
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
746
      if (terrno != 0) code = terrno;
×
747
      goto _OVER;
×
748
    }
749
  } else {
750
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
145✔
751
    if (pDnode == NULL) {
145!
752
      int32_t err = terrno;
×
753
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
×
754
      if (pDnode != NULL) {
×
755
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
×
756
        terrno = err;
×
757
        goto _OVER;
×
758
      }
759

760
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
×
761
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
×
762
        terrno = err;
×
763
        goto _OVER;
×
764
      } else {
765
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
×
766
        if (pDnode == NULL) goto _OVER;
×
767
      }
768
    }
769
  }
770

771
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
161✔
772

773
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
161✔
774
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
161✔
775
  int64_t curMs = taosGetTimestampMs();
161✔
776
  bool    online = mndIsDnodeOnline(pDnode, curMs);
161✔
777
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
161✔
778
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
161✔
779
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
161✔
780
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
161✔
781
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
161✔
782
  bool    analVerChanged = (analVer != statusReq.analVer);
161✔
783
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
146!
784
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
307!
785
  const STraceId *trace = &pReq->info.traceId;
161✔
786
  char            timestamp[TD_TIME_STR_LEN] = {0};
161✔
787
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
161✔
788
  mGTrace(
161!
789
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
790
      "timestamp:%s",
791
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
792

793
  if (reboot) {
161✔
794
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
8✔
795
  }
796

797
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
1,020✔
798
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
859✔
799

800
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
859✔
801
    if (pVgroup != NULL) {
859!
802
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
859!
803
        pVgroup->cacheUsage = pVload->cacheUsage;
859✔
804
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
859✔
805
        pVgroup->numOfTables = pVload->numOfTables;
859✔
806
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
859✔
807
        pVgroup->totalStorage = pVload->totalStorage;
859✔
808
        pVgroup->compStorage = pVload->compStorage;
859✔
809
        pVgroup->pointsWritten = pVload->pointsWritten;
859✔
810
      }
811
      bool stateChanged = false;
859✔
812
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
859!
813
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
859✔
814
        if (pGid->dnodeId == statusReq.dnodeId) {
859!
815
          if (pVload->startTimeMs == 0) {
859!
816
            pVload->startTimeMs = statusReq.rebootTime;
×
817
          }
818
          if (pVload->roleTimeMs == 0) {
859!
819
            pVload->roleTimeMs = statusReq.rebootTime;
×
820
          }
821
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
859✔
822
          break;
859✔
823
        }
824
      }
825
      if (stateChanged) {
859✔
826
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
33✔
827
        if (pDb != NULL && pDb->stateTs != curMs) {
33!
828
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
17!
829
                pDb->stateTs, curMs);
830
          pDb->stateTs = curMs;
17✔
831
        }
832
        mndReleaseDb(pMnode, pDb);
33✔
833
      }
834
    }
835

836
    mndReleaseVgroup(pMnode, pVgroup);
859✔
837
  }
838

839
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
161✔
840
  if (pObj != NULL) {
161!
841
    if (statusReq.mload.roleTimeMs == 0) {
161✔
842
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
15✔
843
    }
844
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
161✔
845
    mndReleaseMnode(pMnode, pObj);
161✔
846
  }
847

848
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
161✔
849
  if (pQnode != NULL) {
161!
850
    pQnode->load = statusReq.qload;
×
851
    mndReleaseQnode(pMnode, pQnode);
×
852
  }
853

854
  if (needCheck) {
161✔
855
    if (statusReq.sver != tsVersion) {
31!
856
      if (pDnode != NULL) {
×
857
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
858
      }
859
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
860
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
861
      goto _OVER;
×
862
    }
863

864
    if (statusReq.dnodeId == 0) {
31✔
865
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
16!
866
    } else {
867
      if (statusReq.clusterId != pMnode->clusterId) {
15!
868
        if (pDnode != NULL) {
×
869
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
870
        }
871
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
872
               pMnode->clusterId);
873
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
874
        goto _OVER;
×
875
      }
876
    }
877

878
    // Verify whether the cluster parameters are consistent when status change from offline to ready
879
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
31✔
880
    if (pDnode->offlineReason != 0) {
31!
881
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
882
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
883
      goto _OVER;
×
884
    }
885

886
    if (!online) {
31✔
887
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
15!
888
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
889
    } else {
890
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
16!
891
            statusReq.dnodeVer, dnodeVer, reboot);
892
    }
893

894
    pDnode->rebootTime = statusReq.rebootTime;
31✔
895
    pDnode->numOfCores = statusReq.numOfCores;
31✔
896
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
31✔
897
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
31✔
898
    pDnode->memAvail = statusReq.memAvail;
31✔
899
    pDnode->memTotal = statusReq.memTotal;
31✔
900
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
31✔
901
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
31✔
902
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
31✔
903
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
23✔
904
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
23!
905
        goto _OVER;
×
906
      }
907
    }
908

909
    SStatusRsp statusRsp = {0};
31✔
910
    statusRsp.statusSeq++;
31✔
911
    statusRsp.analVer = analVer;
31✔
912
    statusRsp.dnodeVer = dnodeVer;
31✔
913
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
31✔
914
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
31✔
915
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
31✔
916
    if (statusRsp.pDnodeEps == NULL) {
31!
917
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
918
      goto _OVER;
×
919
    }
920

921
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
31✔
922
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
31✔
923

924
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
31✔
925
    void   *pHead = rpcMallocCont(contLen);
31✔
926
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
31✔
927
    taosArrayDestroy(statusRsp.pDnodeEps);
31✔
928
    if (contLen < 0) {
31!
929
      code = contLen;
×
930
      goto _OVER;
×
931
    }
932

933
    pReq->info.rspLen = contLen;
31✔
934
    pReq->info.rsp = pHead;
31✔
935
  }
936

937
  pDnode->accessTimes++;
161✔
938
  pDnode->lastAccessTime = curMs;
161✔
939
  code = 0;
161✔
940

941
_OVER:
161✔
942
  mndReleaseDnode(pMnode, pDnode);
161✔
943
  taosArrayDestroy(statusReq.pVloads);
161✔
944
  return mndUpdClusterInfo(pReq);
161✔
945
}
946

947
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
948
  SMnode    *pMnode = pReq->info.node;
×
949
  SNotifyReq notifyReq = {0};
×
950
  int32_t    code = 0;
×
951

952
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
953
    terrno = code;
×
954
    goto _OVER;
×
955
  }
956

957
  int64_t clusterid = mndGetClusterId(pMnode);
×
958
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
959
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
960
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
961
          notifyReq.clusterId, clusterid, tstrerror(code));
962
    goto _OVER;
×
963
  }
964

965
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
966
  for (int32_t v = 0; v < nVgroup; ++v) {
×
967
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
968

969
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
970
    if (pVgroup != NULL) {
×
971
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
972
      mndReleaseVgroup(pMnode, pVgroup);
×
973
    }
974
  }
975
  code = mndUpdClusterInfo(pReq);
×
976
_OVER:
×
977
  tFreeSNotifyReq(&notifyReq);
×
978
  return code;
×
979
}
980

981
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
×
982
  int32_t  code = -1;
×
983
  SSdbRaw *pRaw = NULL;
×
984
  STrans  *pTrans = NULL;
×
985

986
  SDnodeObj dnodeObj = {0};
×
987
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
×
988
  dnodeObj.createdTime = taosGetTimestampMs();
×
989
  dnodeObj.updateTime = dnodeObj.createdTime;
×
990
  dnodeObj.port = pCreate->port;
×
991
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
×
992
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
×
993

994
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
×
995
  if (pTrans == NULL) {
×
996
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
997
    if (terrno != 0) code = terrno;
×
998
    goto _OVER;
×
999
  }
1000
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
×
1001
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1002

1003
  pRaw = mndDnodeActionEncode(&dnodeObj);
×
1004
  if (pRaw == NULL) {
×
1005
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1006
    if (terrno != 0) code = terrno;
×
1007
    goto _OVER;
×
1008
  }
1009
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
×
1010
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
×
1011
  pRaw = NULL;
×
1012

1013
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1014
  code = 0;
×
1015

1016
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
×
1017
                                   1);  // TODO: check the return value
1018
_OVER:
×
1019
  mndTransDrop(pTrans);
×
1020
  sdbFreeRaw(pRaw);
×
1021
  return code;
×
1022
}
1023

1024
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
×
1025
  SMnode       *pMnode = pReq->info.node;
×
1026
  SSdb         *pSdb = pMnode->pSdb;
×
1027
  SDnodeObj    *pObj = NULL;
×
1028
  void         *pIter = NULL;
×
1029
  SDnodeListRsp rsp = {0};
×
1030
  int32_t       code = -1;
×
1031

1032
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
×
1033
  if (NULL == rsp.dnodeList) {
×
1034
    mError("failed to alloc epSet while process dnode list req");
×
1035
    code = terrno;
×
1036
    goto _OVER;
×
1037
  }
1038

1039
  while (1) {
×
1040
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1041
    if (pIter == NULL) break;
×
1042

1043
    SDNodeAddr dnodeAddr = {0};
×
1044
    dnodeAddr.nodeId = pObj->id;
×
1045
    dnodeAddr.epSet.numOfEps = 1;
×
1046
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
×
1047
    dnodeAddr.epSet.eps[0].port = pObj->port;
×
1048

1049
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
×
1050
      if (terrno != 0) code = terrno;
×
1051
      sdbRelease(pSdb, pObj);
×
1052
      sdbCancelFetch(pSdb, pIter);
×
1053
      goto _OVER;
×
1054
    }
1055

1056
    sdbRelease(pSdb, pObj);
×
1057
  }
1058

1059
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
×
1060
  void   *pRsp = rpcMallocCont(rspLen);
×
1061
  if (pRsp == NULL) {
×
1062
    code = terrno;
×
1063
    goto _OVER;
×
1064
  }
1065

1066
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
×
1067
    code = rspLen;
×
1068
    goto _OVER;
×
1069
  }
1070

1071
  pReq->info.rspLen = rspLen;
×
1072
  pReq->info.rsp = pRsp;
×
1073
  code = 0;
×
1074

1075
_OVER:
×
1076

1077
  if (code != 0) {
×
1078
    mError("failed to get dnode list since %s", tstrerror(code));
×
1079
  }
1080

1081
  tFreeSDnodeListRsp(&rsp);
×
1082

1083
  TAOS_RETURN(code);
×
1084
}
1085

1086
void getSlowLogScopeString(int32_t scope, char *result) {
×
1087
  if (scope == SLOW_LOG_TYPE_NULL) {
×
1088
    (void)strncat(result, "NONE", 64);
×
1089
    return;
×
1090
  }
1091
  while (scope > 0) {
×
1092
    if (scope & SLOW_LOG_TYPE_QUERY) {
×
1093
      (void)strncat(result, "QUERY", 64);
×
1094
      scope &= ~SLOW_LOG_TYPE_QUERY;
×
1095
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1096
      (void)strncat(result, "INSERT", 64);
×
1097
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1098
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1099
      (void)strncat(result, "OTHERS", 64);
×
1100
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1101
    } else {
1102
      (void)printf("invalid slow log scope:%d", scope);
×
1103
      return;
×
1104
    }
1105

1106
    if (scope > 0) {
×
1107
      (void)strncat(result, "|", 64);
×
1108
    }
1109
  }
1110
}
1111

1112
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
×
1113
  SMnode         *pMnode = pReq->info.node;
×
1114
  int32_t         code = -1;
×
1115
  SDnodeObj      *pDnode = NULL;
×
1116
  SCreateDnodeReq createReq = {0};
×
1117
  int32_t         lino = 0;
×
1118

1119
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
×
1120
    goto _OVER;
×
1121
  }
1122

1123
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
×
1124
  TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1125

1126
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
×
1127
  code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE);
×
1128
  TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1129

1130
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
×
1131
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1132
    goto _OVER;
×
1133
  }
1134
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1135
  // if (code != 0) {
1136
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1137
  //          tstrerror(code));
1138
  //   goto _OVER;
1139
  // }
1140

1141
  char ep[TSDB_EP_LEN];
1142
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
×
1143
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1144
  if (pDnode != NULL) {
×
1145
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1146
    goto _OVER;
×
1147
  }
1148

1149
  code = mndCreateDnode(pMnode, pReq, &createReq);
×
1150
  if (code == 0) {
×
1151
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1152
    tsGrantHBInterval = 5;
×
1153
  }
1154

1155
  char obj[200] = {0};
×
1156
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
×
1157

1158
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
×
1159

1160
_OVER:
×
1161
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1162
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1163
  }
1164

1165
  mndReleaseDnode(pMnode, pDnode);
×
1166
  tFreeSCreateDnodeReq(&createReq);
×
1167
  TAOS_RETURN(code);
×
1168
}
1169

1170
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1171

1172
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
×
1173

1174
#ifndef TD_ENTERPRISE
1175
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1176
#endif
1177

1178
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
×
1179
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1180
  int32_t  code = -1;
×
1181
  SSdbRaw *pRaw = NULL;
×
1182
  STrans  *pTrans = NULL;
×
1183
  int32_t  lino = 0;
×
1184

1185
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
×
1186
  if (pTrans == NULL) {
×
1187
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1188
    if (terrno != 0) code = terrno;
×
1189
    goto _OVER;
×
1190
  }
1191
  mndTransSetGroupParallel(pTrans);
×
1192
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
×
1193
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
×
1194

1195
  pRaw = mndDnodeActionEncode(pDnode);
×
1196
  if (pRaw == NULL) {
×
1197
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1198
    if (terrno != 0) code = terrno;
×
1199
    goto _OVER;
×
1200
  }
1201
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
×
1202
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
×
1203
  pRaw = NULL;
×
1204

1205
  pRaw = mndDnodeActionEncode(pDnode);
×
1206
  if (pRaw == NULL) {
×
1207
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1208
    if (terrno != 0) code = terrno;
×
1209
    goto _OVER;
×
1210
  }
1211
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
×
1212
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
×
1213
  pRaw = NULL;
×
1214

1215
  if (pSObj != NULL) {
×
1216
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
1217
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
×
1218
  }
1219

1220
  if (pMObj != NULL) {
×
1221
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
1222
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
×
1223
  }
1224

1225
  if (pQObj != NULL) {
×
1226
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
1227
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
×
1228
  }
1229

1230
  if (pBObj != NULL) {
×
1231
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
1232
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
×
1233
  }
1234

1235
  if (numOfVnodes > 0) {
×
1236
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
×
1237
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
×
1238
  }
1239

1240
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
×
1241

1242
  code = 0;
×
1243

1244
_OVER:
×
1245
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
×
1246
  mndTransDrop(pTrans);
×
1247
  sdbFreeRaw(pRaw);
×
1248
  TAOS_RETURN(code);
×
1249
}
1250

1251
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1252
  bool       isEmpty = false;
×
1253
  SMnodeObj *pMObj = NULL;
×
1254
  SQnodeObj *pQObj = NULL;
×
1255
  SSnodeObj *pSObj = NULL;
×
1256

1257
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1258
  if (pQObj) goto _OVER;
×
1259

1260
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1261
  if (pSObj) goto _OVER;
×
1262

1263
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1264
  if (pMObj) goto _OVER;
×
1265

1266
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1267
  if (numOfVnodes > 0) goto _OVER;
×
1268

1269
  isEmpty = true;
×
1270
_OVER:
×
1271
  mndReleaseMnode(pMnode, pMObj);
×
1272
  mndReleaseQnode(pMnode, pQObj);
×
1273
  mndReleaseSnode(pMnode, pSObj);
×
1274
  return isEmpty;
×
1275
}
1276

1277
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
×
1278
  SMnode       *pMnode = pReq->info.node;
×
1279
  int32_t       code = -1;
×
1280
  SDnodeObj    *pDnode = NULL;
×
1281
  SMnodeObj    *pMObj = NULL;
×
1282
  SQnodeObj    *pQObj = NULL;
×
1283
  SSnodeObj    *pSObj = NULL;
×
1284
  SBnodeObj    *pBObj = NULL;
×
1285
  SDropDnodeReq dropReq = {0};
×
1286

1287
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
1288

1289
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
×
1290
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1291
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
×
1292

1293
  bool force = dropReq.force;
×
1294
  if (dropReq.unsafe) {
×
1295
    force = true;
×
1296
  }
1297

1298
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
×
1299
  if (pDnode == NULL) {
×
1300
    int32_t err = terrno;
×
1301
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1302
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1303
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1304
    if (pDnode == NULL) {
×
1305
      code = err;
×
1306
      goto _OVER;
×
1307
    }
1308
  }
1309

1310
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
×
1311
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
×
1312
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
×
1313
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
×
1314
  if (pMObj != NULL) {
×
1315
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
×
1316
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
×
1317
      goto _OVER;
×
1318
    }
1319
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
×
1320
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
×
1321
      goto _OVER;
×
1322
    }
1323
  }
1324

1325
#ifdef USE_MOUNT
1326
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
×
1327
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1328
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1329
    goto _OVER;
×
1330
  }
1331
#endif
1332

1333
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
1334
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
1335

1336
  if (isonline && force) {
×
1337
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1338
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1339
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1340
    goto _OVER;
×
1341
  }
1342

1343
  mError("vnode num:%d", numOfVnodes);
×
1344

1345
  bool    vnodeOffline = false;
×
1346
  void   *pIter = NULL;
×
1347
  int32_t vgId = -1;
×
1348
  while (1) {
×
1349
    SVgObj *pVgroup = NULL;
×
1350
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
1351
    if (pIter == NULL) break;
×
1352

1353
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
×
1354
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
×
1355
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
×
1356
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
×
1357
          vgId = pVgroup->vgId;
×
1358
          vnodeOffline = true;
×
1359
          break;
×
1360
        }
1361
      }
1362
    }
1363

1364
    sdbRelease(pMnode->pSdb, pVgroup);
×
1365

1366
    if (vnodeOffline) {
×
1367
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1368
      break;
×
1369
    }
1370
  }
1371

1372
  if (vnodeOffline && !force) {
×
1373
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1374
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1375
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1376
    goto _OVER;
×
1377
  }
1378

1379
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
×
1380
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1381

1382
  char obj1[30] = {0};
×
1383
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
×
1384

1385
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
×
1386

1387
_OVER:
×
1388
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1389
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1390
  }
1391

1392
  mndReleaseDnode(pMnode, pDnode);
×
1393
  mndReleaseMnode(pMnode, pMObj);
×
1394
  mndReleaseQnode(pMnode, pQObj);
×
1395
  mndReleaseBnode(pMnode, pBObj);
×
1396
  mndReleaseSnode(pMnode, pSObj);
×
1397
  tFreeSDropDnodeReq(&dropReq);
×
1398
  TAOS_RETURN(code);
×
1399
}
1400

1401
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
×
1402
  int32_t code = 0;
×
1403
  SMnode *pMnode = pReq->info.node;
×
1404
  SSdb   *pSdb = pMnode->pSdb;
×
1405
  void   *pIter = NULL;
×
1406
  int8_t  encrypting = 0;
×
1407

1408
  const STraceId *trace = &pReq->info.traceId;
×
1409

1410
  int32_t klen = strlen(pDcfgReq->value);
×
1411
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
1412
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1413
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1414
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1415
    goto _exit;
×
1416
  }
1417

1418
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
×
1419
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1420
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1421
    goto _exit;
×
1422
  }
1423

1424
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
×
1425
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1426
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1427
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1428
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1429
    goto _exit;
×
1430
  }
1431

1432
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
×
1433
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
×
1434
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
×
1435

1436
  while (1) {
×
1437
    SDnodeObj *pDnode = NULL;
×
1438
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
1439
    if (pIter == NULL) break;
×
1440
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
1441
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1442
             offlineReason[pDnode->offlineReason]);
1443
      sdbRelease(pSdb, pDnode);
×
1444
      continue;
×
1445
    }
1446

1447
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
×
1448
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
1449
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
×
1450
      void   *pBuf = rpcMallocCont(bufLen);
×
1451

1452
      if (pBuf != NULL) {
×
1453
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
×
1454
          code = bufLen;
×
1455
          sdbRelease(pSdb, pDnode);
×
1456
          goto _exit;
×
1457
        }
1458
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
1459
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
×
1460
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
×
1461
        }
1462
      }
1463
    }
1464

1465
    sdbRelease(pSdb, pDnode);
×
1466
  }
1467

1468
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
×
1469
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1470
  }
1471

1472
_exit:
×
1473
  if (code != 0) {
×
1474
    if (terrno == 0) terrno = code;
×
1475
  }
1476
  return code;
×
1477
}
1478

1479
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
×
1480
  int32_t code = 0;
×
1481

1482
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1483
  SMnode       *pMnode = pReq->info.node;
×
1484
  SMCfgDnodeReq cfgReq = {0};
×
1485
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
×
1486

1487
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
×
1488
    tFreeSMCfgDnodeReq(&cfgReq);
×
1489
    TAOS_RETURN(code);
×
1490
  }
1491
  const STraceId *trace = &pReq->info.traceId;
×
1492
  SDCfgDnodeReq   dcfgReq = {0};
×
1493
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
×
1494
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
×
1495
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
×
1496
    tFreeSMCfgDnodeReq(&cfgReq);
×
1497
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
×
1498
  } else {
1499
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1500
    tFreeSMCfgDnodeReq(&cfgReq);
×
1501
    TAOS_RETURN(code);
×
1502
  }
1503

1504
#else
1505
  TAOS_RETURN(code);
1506
#endif
1507
}
1508

1509
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
×
1510
  SMnode *pMnode = pRsp->info.node;
×
1511
  int16_t nSuccess = 0;
×
1512
  int16_t nFailed = 0;
×
1513

1514
  if (0 == pRsp->code) {
×
1515
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
×
1516
  } else {
1517
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1518
  }
1519

1520
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
×
1521
  bool    finished = nSuccess + nFailed >= nReq;
×
1522

1523
  if (finished) {
×
1524
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1525
  }
1526

1527
  const STraceId *trace = &pRsp->info.traceId;
×
1528
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
×
1529
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1530

1531
  return 0;
×
1532
}
1533

1534
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1535
  SMnode *pMnode = pReq->info.node;
×
1536
  int32_t totalRows = 0;
×
1537
  int32_t numOfRows = 0;
×
1538
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
×
1539
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
×
1540
  char   *pWrite = NULL;
×
1541
  int32_t cols = 0;
×
1542
  int32_t code = 0;
×
1543
  int32_t lino = 0;
×
1544

1545
  cfgOpts[totalRows] = "statusInterval";
×
1546
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
×
1547
  totalRows++;
×
1548

1549
  cfgOpts[totalRows] = "timezone";
×
1550
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
×
1551
  totalRows++;
×
1552

1553
  cfgOpts[totalRows] = "locale";
×
1554
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
×
1555
  totalRows++;
×
1556

1557
  cfgOpts[totalRows] = "charset";
×
1558
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
×
1559
  totalRows++;
×
1560

1561
  cfgOpts[totalRows] = "monitor";
×
1562
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
×
1563
  totalRows++;
×
1564

1565
  cfgOpts[totalRows] = "monitorInterval";
×
1566
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
×
1567
  totalRows++;
×
1568

1569
  cfgOpts[totalRows] = "slowLogThreshold";
×
1570
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
×
1571
  totalRows++;
×
1572

1573
  cfgOpts[totalRows] = "slowLogMaxLen";
×
1574
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
×
1575
  totalRows++;
×
1576

1577
  char scopeStr[64] = {0};
×
1578
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
×
1579
  cfgOpts[totalRows] = "slowLogScope";
×
1580
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
×
1581
  totalRows++;
×
1582

1583
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
×
1584
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
×
1585

1586
  for (int32_t i = 0; i < totalRows; i++) {
×
1587
    cols = 0;
×
1588

1589
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
×
1590
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1591
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
×
1592

1593
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
×
1594
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1595
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
×
1596

1597
    numOfRows++;
×
1598
  }
1599

1600
_OVER:
×
1601
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
×
1602
  pShow->numOfRows += numOfRows;
×
1603
  return numOfRows;
×
1604
}
1605

1606
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1607

1608
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1609
  SMnode    *pMnode = pReq->info.node;
×
1610
  SSdb      *pSdb = pMnode->pSdb;
×
1611
  int32_t    numOfRows = 0;
×
1612
  int32_t    cols = 0;
×
1613
  ESdbStatus objStatus = 0;
×
1614
  SDnodeObj *pDnode = NULL;
×
1615
  int64_t    curMs = taosGetTimestampMs();
×
1616
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1617
  int32_t    code = 0;
×
1618
  int32_t    lino = 0;
×
1619

1620
  while (numOfRows < rows) {
×
1621
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
×
1622
    if (pShow->pIter == NULL) break;
×
1623
    bool online = mndIsDnodeOnline(pDnode, curMs);
×
1624

1625
    cols = 0;
×
1626

1627
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1628
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
×
1629

1630
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
×
1631

1632
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1633
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1634

1635
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1636
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
×
1637
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
×
1638

1639
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1640
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
×
1641
                        &lino, _OVER);
1642

1643
    const char *status = "ready";
×
1644
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
×
1645
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
×
1646
    if (!online) {
×
1647
      if (objStatus == SDB_STATUS_CREATING)
×
1648
        status = "creating*";
×
1649
      else if (objStatus == SDB_STATUS_DROPPING)
×
1650
        status = "dropping*";
×
1651
      else
1652
        status = "offline";
×
1653
    }
1654

1655
    STR_TO_VARSTR(buf, status);
×
1656
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1657
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1658

1659
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1660
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
×
1661
                        _OVER);
1662

1663
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1664
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
×
1665
                        _OVER);
1666

1667
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
×
1668
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
×
1669

1670
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1671
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
×
1672
    taosMemoryFreeClear(b);
×
1673

1674
#ifdef TD_ENTERPRISE
1675
    STR_TO_VARSTR(buf, pDnode->machineId);
×
1676
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1677
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1678
#endif
1679

1680
    numOfRows++;
×
1681
    sdbRelease(pSdb, pDnode);
×
1682
  }
1683

1684
_OVER:
×
1685
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
×
1686

1687
  pShow->numOfRows += numOfRows;
×
1688
  return numOfRows;
×
1689
}
1690

1691
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1692
  SSdb *pSdb = pMnode->pSdb;
×
1693
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1694
}
×
1695

1696
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
17✔
1697
  SDnodeObj *pObj = NULL;
17✔
1698
  void      *pIter = NULL;
17✔
1699
  SSdb      *pSdb = pMnode->pSdb;
17✔
1700
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
17✔
1701
  while (1) {
4✔
1702
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
21✔
1703
    if (pIter == NULL) break;
21✔
1704

1705
    char *fqdn = taosStrdup(pObj->fqdn);
4!
1706
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
4!
1707
      mError("failed to fqdn into array, but continue at this time");
×
1708
    }
1709
    sdbRelease(pSdb, pObj);
4✔
1710
  }
1711
  return fqdns;
17✔
1712
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc