• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4192

30 May 2025 03:55AM UTC coverage: 63.023% (-0.2%) from 63.267%
#4192

push

travis-ci

web-flow
fix:defined col bind in interlace mode (#31246)

157832 of 318864 branches covered (49.5%)

Branch coverage included in aggregate %.

1 of 3 new or added lines in 1 file covered. (33.33%)

2934 existing lines in 172 files now uncovered.

243367 of 317732 relevant lines covered (76.6%)

17346426.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.91
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include <stdio.h>
18
#include "audit.h"
19
#include "mndCluster.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "taos_monitor.h"
31
#include "tconfig.h"
32
#include "tjson.h"
33
#include "tmisce.h"
34
#include "tunit.h"
35

36
#define TSDB_DNODE_VER_NUMBER   2
37
#define TSDB_DNODE_RESERVE_SIZE 40
38

39
static const char *offlineReason[] = {
40
    "",
41
    "status msg timeout",
42
    "status not received",
43
    "version not match",
44
    "dnodeId not match",
45
    "clusterId not match",
46
    "statusInterval not match",
47
    "timezone not match",
48
    "locale not match",
49
    "charset not match",
50
    "ttlChangeOnWrite not match",
51
    "enableWhiteList not match",
52
    "encryptionKey not match",
53
    "monitor not match",
54
    "monitor switch not match",
55
    "monitor interval not match",
56
    "monitor slow log threshold not match",
57
    "monitor slow log sql max len not match",
58
    "monitor slow log scope not match",
59
    "unknown",
60
};
61

62
enum {
63
  DND_ACTIVE_CODE,
64
  DND_CONN_ACTIVE_CODE,
65
};
66

67
enum {
68
  DND_CREATE,
69
  DND_ADD,
70
  DND_DROP,
71
};
72

73
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
74
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
75
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
76
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
77
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
78
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
79
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
80

81
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
82
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
84
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
85
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
86
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
87
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
88
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
89
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
90
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
91

92
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
93
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
94
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
96

97
#ifdef _GRANT
98
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
99
#else
100
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
101
#endif
102

103
int32_t mndInitDnode(SMnode *pMnode) {
2,189✔
104
  SSdbTable table = {
2,189✔
105
      .sdbType = SDB_DNODE,
106
      .keyType = SDB_KEY_INT32,
107
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
108
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
109
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
110
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
111
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
112
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
113
  };
114

115
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
2,189✔
116
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
2,189✔
117
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
2,189✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
2,189✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
2,189✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
2,189✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
2,189✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
2,189✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
2,189✔
124
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
2,189✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
2,189✔
126

127
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
2,189✔
128
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
2,189✔
129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
2,189✔
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
2,189✔
131

132
  return sdbSetTable(pMnode->pSdb, table);
2,189✔
133
}
134

135
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
136
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
137
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
138
void          mndCleanupDnode(SMnode *pMnode) {}
2,188✔
139

140
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
1,634✔
141
  int32_t  code = -1;
1,634✔
142
  SSdbRaw *pRaw = NULL;
1,634✔
143
  STrans  *pTrans = NULL;
1,634✔
144

145
  SDnodeObj dnodeObj = {0};
1,634✔
146
  dnodeObj.id = 1;
1,634✔
147
  dnodeObj.createdTime = taosGetTimestampMs();
1,634✔
148
  dnodeObj.updateTime = dnodeObj.createdTime;
1,634✔
149
  dnodeObj.port = tsServerPort;
1,634✔
150
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
1,634✔
151
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
1,634✔
152
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
1,634✔
153
  char *machineId = NULL;
1,634✔
154
  code = tGetMachineId(&machineId);
1,634✔
155
  if (machineId) {
1,634!
156
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
1,634✔
157
    taosMemoryFreeClear(machineId);
1,634!
158
  } else {
159
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
160
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
161
    goto _OVER;
×
162
#endif
163
  }
164

165
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
1,634✔
166
  if (pTrans == NULL) {
1,634!
167
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
168
    if (terrno != 0) code = terrno;
×
169
    goto _OVER;
×
170
  }
171
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
1,634!
172

173
  pRaw = mndDnodeActionEncode(&dnodeObj);
1,634✔
174
  if (pRaw == NULL) {
1,634!
175
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    goto _OVER;
×
178
  }
179
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
1,634!
180
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
1,634!
181
  pRaw = NULL;
1,634✔
182

183
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,634!
184
  code = 0;
1,634✔
185
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
1,634✔
186
                                   1);  // TODO: check the return value
187

188
_OVER:
1,634✔
189
  mndTransDrop(pTrans);
1,634✔
190
  sdbFreeRaw(pRaw);
1,634✔
191
  return code;
1,634✔
192
}
193

194
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
12,597✔
195
  int32_t code = 0;
12,597✔
196
  int32_t lino = 0;
12,597✔
197
  terrno = TSDB_CODE_OUT_OF_MEMORY;
12,597✔
198

199
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
12,597✔
200
  if (pRaw == NULL) goto _OVER;
12,597!
201

202
  int32_t dataPos = 0;
12,597✔
203
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
12,597!
204
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
12,597!
205
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
12,597!
206
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
12,597!
207
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
12,597!
208
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
12,597!
209
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
12,597!
210
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
12,597!
211
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
12,597!
212
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
12,597!
213

214
  terrno = 0;
12,597✔
215

216
_OVER:
12,597✔
217
  if (terrno != 0) {
12,597!
218
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
219
    sdbFreeRaw(pRaw);
×
220
    return NULL;
×
221
  }
222

223
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
12,597✔
224
  return pRaw;
12,597✔
225
}
226

227
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
9,685✔
228
  int32_t code = 0;
9,685✔
229
  int32_t lino = 0;
9,685✔
230
  terrno = TSDB_CODE_OUT_OF_MEMORY;
9,685✔
231
  SSdbRow   *pRow = NULL;
9,685✔
232
  SDnodeObj *pDnode = NULL;
9,685✔
233

234
  int8_t sver = 0;
9,685✔
235
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
9,685!
236
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
9,685!
237
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
238
    goto _OVER;
×
239
  }
240

241
  pRow = sdbAllocRow(sizeof(SDnodeObj));
9,685✔
242
  if (pRow == NULL) goto _OVER;
9,685!
243

244
  pDnode = sdbGetRowObj(pRow);
9,685✔
245
  if (pDnode == NULL) goto _OVER;
9,685!
246

247
  int32_t dataPos = 0;
9,685✔
248
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
9,685!
249
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
9,685!
250
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
9,685!
251
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
9,685!
252
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
9,685!
253
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
9,685!
254
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
9,685!
255
  if (sver > 1) {
9,685!
256
    int16_t keyLen = 0;
9,685✔
257
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
9,685!
258
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
9,685!
259
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
9,685!
260
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
9,685!
261
  }
262

263
  terrno = 0;
9,685✔
264
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
9,685!
265
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
266
  }
267

268
_OVER:
9,685✔
269
  if (terrno != 0) {
9,685!
270
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
271
    taosMemoryFreeClear(pRow);
×
272
    return NULL;
×
273
  }
274

275
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
9,685✔
276
  return pRow;
9,685✔
277
}
278

279
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
3,976✔
280
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
3,976✔
281
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
3,976✔
282

283
  char ep[TSDB_EP_LEN] = {0};
3,976✔
284
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
3,976✔
285
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
3,976✔
286
  return 0;
3,976✔
287
}
288

289
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
9,684✔
290
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
9,684✔
291
  return 0;
9,684✔
292
}
293

294
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
5,679✔
295
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
5,679✔
296
  pOld->updateTime = pNew->updateTime;
5,679✔
297
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
298
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
5,679✔
299
#endif
300
  return 0;
5,679✔
301
}
302

303
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
21,691,526✔
304
  SSdb      *pSdb = pMnode->pSdb;
21,691,526✔
305
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
21,691,526✔
306
  if (pDnode == NULL) {
21,709,309✔
307
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
489✔
308
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
111✔
309
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
378!
310
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
311
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
378!
312
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
378✔
313
    } else {
314
      terrno = TSDB_CODE_APP_ERROR;
×
315
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
316
    }
317
  }
318

319
  return pDnode;
21,709,151✔
320
}
321

322
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
21,309,112✔
323
  SSdb *pSdb = pMnode->pSdb;
21,309,112✔
324
  sdbRelease(pSdb, pDnode);
21,309,112✔
325
}
21,310,660✔
326

327
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
31,729✔
328
  SEpSet epSet = {0};
31,729✔
329
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
31,729✔
330
  return epSet;
31,729✔
331
}
332

333
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
334
  SEpSet     epSet = {0};
×
335
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
336
  if (!pDnode) return epSet;
×
337

338
  epSet = mndGetDnodeEpset(pDnode);
×
339

340
  mndReleaseDnode(pMnode, pDnode);
×
341
  return epSet;
×
342
}
343

344
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
4,106✔
345
  SSdb *pSdb = pMnode->pSdb;
4,106✔
346

347
  void *pIter = NULL;
4,106✔
348
  while (1) {
4,714✔
349
    SDnodeObj *pDnode = NULL;
8,820✔
350
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
8,820✔
351
    if (pIter == NULL) break;
8,820✔
352

353
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
7,333✔
354
      sdbCancelFetch(pSdb, pIter);
2,619✔
355
      return pDnode;
2,619✔
356
    }
357

358
    sdbRelease(pSdb, pDnode);
4,714✔
359
  }
360

361
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
1,487✔
362
  return NULL;
1,487✔
363
}
364

365
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
278✔
366
  SSdb *pSdb = pMnode->pSdb;
278✔
367

368
  void *pIter = NULL;
278✔
369
  while (1) {
278✔
370
    SDnodeObj *pDnode = NULL;
556✔
371
    ESdbStatus objStatus = 0;
556✔
372
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
556✔
373
    if (pIter == NULL) break;
556!
374

375
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
556✔
376
      sdbCancelFetch(pSdb, pIter);
278✔
377
      return pDnode;
278✔
378
    }
379

380
    sdbRelease(pSdb, pDnode);
278✔
381
  }
382

383
  return NULL;
×
384
}
385

386
int32_t mndGetDnodeSize(SMnode *pMnode) {
460,867✔
387
  SSdb *pSdb = pMnode->pSdb;
460,867✔
388
  return sdbGetSize(pSdb, SDB_DNODE);
460,867✔
389
}
390

391
int32_t mndGetDbSize(SMnode *pMnode) {
×
392
  SSdb *pSdb = pMnode->pSdb;
×
393
  return sdbGetSize(pSdb, SDB_DB);
×
394
}
395

396
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
1,475,144✔
397
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
1,475,144✔
398
  if (interval > 5000 * (int64_t)tsStatusInterval) {
1,475,144✔
399
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
8,713✔
400
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
174✔
401
    }
402
    return false;
8,713✔
403
  }
404
  return true;
1,466,431✔
405
}
406

407
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
10,058✔
408
  SSdb *pSdb = pMnode->pSdb;
10,058✔
409

410
  int32_t numOfEps = 0;
10,058✔
411
  void   *pIter = NULL;
10,058✔
412
  while (1) {
29,659✔
413
    SDnodeObj *pDnode = NULL;
39,717✔
414
    ESdbStatus objStatus = 0;
39,717✔
415
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
39,717✔
416
    if (pIter == NULL) break;
39,717✔
417

418
    SDnodeEp dnodeEp = {0};
29,659✔
419
    dnodeEp.id = pDnode->id;
29,659✔
420
    dnodeEp.ep.port = pDnode->port;
29,659✔
421
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
29,659✔
422
    sdbRelease(pSdb, pDnode);
29,659✔
423

424
    dnodeEp.isMnode = 0;
29,659✔
425
    if (mndIsMnode(pMnode, pDnode->id)) {
29,659✔
426
      dnodeEp.isMnode = 1;
13,998✔
427
    }
428
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
29,659!
429
      mError("failed to put ep into array, but continue at this call");
×
430
    }
431
  }
432
}
10,058✔
433

434
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
60,121✔
435
  SSdb   *pSdb = pMnode->pSdb;
60,121✔
436
  int32_t code = 0;
60,121✔
437

438
  int32_t numOfEps = 0;
60,121✔
439
  void   *pIter = NULL;
60,121✔
440
  while (1) {
267,576✔
441
    SDnodeObj *pDnode = NULL;
327,697✔
442
    ESdbStatus objStatus = 0;
327,697✔
443
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
327,697✔
444
    if (pIter == NULL) break;
327,697✔
445

446
    SDnodeInfo dInfo;
447
    dInfo.id = pDnode->id;
267,576✔
448
    dInfo.ep.port = pDnode->port;
267,576✔
449
    dInfo.offlineReason = pDnode->offlineReason;
267,576✔
450
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
267,576✔
451
    sdbRelease(pSdb, pDnode);
267,576✔
452
    if (mndIsMnode(pMnode, pDnode->id)) {
267,576✔
453
      dInfo.isMnode = 1;
79,003✔
454
    } else {
455
      dInfo.isMnode = 0;
188,573✔
456
    }
457

458
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
267,576!
459
      code = terrno;
×
460
      sdbCancelFetch(pSdb, pIter);
×
461
      break;
×
462
    }
463
  }
464
  TAOS_RETURN(code);
60,121✔
465
}
466

467
#define CHECK_MONITOR_PARA(para, err)                                                                    \
468
  if (pCfg->monitorParas.para != para) {                                                                 \
469
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
470
    terrno = err;                                                                                        \
471
    return err;                                                                                          \
472
  }
473

474
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
10,058✔
475
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
10,058!
476
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
10,058!
477
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
10,058!
478
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
10,058!
479
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
10,058!
480

481
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
10,058!
482
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
483
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
484
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
485
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
486
  }
487

488
  if (pCfg->statusInterval != tsStatusInterval) {
10,058!
489
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
490
           tsStatusInterval);
491
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
492
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
493
  }
494

495
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
10,058!
496
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
497
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
498
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
499
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
500
  }
501

502
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
10,058!
503
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
504
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
505
    return DND_REASON_LOCALE_NOT_MATCH;
×
506
  }
507

508
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
10,058!
509
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
510
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
511
    return DND_REASON_CHARSET_NOT_MATCH;
×
512
  }
513

514
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
10,058!
515
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
516
           tsTtlChangeOnWrite);
517
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
518
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
519
  }
520
  int8_t enable = tsEnableWhiteList ? 1 : 0;
10,058✔
521
  if (pCfg->enableWhiteList != enable) {
10,058!
522
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
523
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
524
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
525
  }
526

527
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
10,058!
528
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
10,058!
529
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
530
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
531
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
532
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
533
  }
534

535
  return DND_REASON_ONLINE;
10,058✔
536
}
537

538
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
201✔
539
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
201!
540
    return 0.0;
19✔
541
  }
542

543
  int64_t deltaCount = currentCount - lastCount;
182✔
544
  int64_t deltaMs = currentTimeMs - lastTimeMs;
182✔
545
  double  rate = (double)deltaCount / (double)deltaMs;
182✔
546
  return rate;
182✔
547
}
548

549
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
704,143✔
550
  bool stateChanged = false;
704,143✔
551
  bool roleChanged = pGid->syncState != pVload->syncState ||
2,108,710✔
552
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
1,393,244!
553
                     pGid->roleTimeMs != pVload->roleTimeMs;
689,101✔
554

555
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
704,143✔
556
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
320✔
557
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
112✔
558
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
208✔
559
      int64_t currentTimeMs = taosGetTimestampMs();
201✔
560
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
201✔
561
                                          pGid->lastSyncAppliedIndexUpdateTime);
562

563
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
201✔
564
    }
565
  }
566

567
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
704,143✔
568
  pGid->syncCommitIndex = pVload->syncCommitIndex;
704,143✔
569
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
704,143✔
570
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
704,143✔
571
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
704,143!
572
      pGid->startTimeMs != pVload->startTimeMs) {
687,972!
573
    mInfo(
16,171!
574
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
575
        "canRead:%d, dnode:%d",
576
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
577
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
578
    pGid->syncState = pVload->syncState;
16,171✔
579
    pGid->syncTerm = pVload->syncTerm;
16,171✔
580
    pGid->syncRestore = pVload->syncRestore;
16,171✔
581
    pGid->syncCanRead = pVload->syncCanRead;
16,171✔
582
    pGid->startTimeMs = pVload->startTimeMs;
16,171✔
583
    pGid->roleTimeMs = pVload->roleTimeMs;
16,171✔
584
    stateChanged = true;
16,171✔
585
  }
586
  return stateChanged;
704,143✔
587
}
588

589
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
74,203✔
590
  bool stateChanged = false;
74,203✔
591
  bool roleChanged = pObj->syncState != pMload->syncState ||
220,489✔
592
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
146,245!
593
                     pObj->roleTimeMs != pMload->roleTimeMs;
72,042✔
594
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
74,203✔
595
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
2,252!
596
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
597
          pObj->syncTerm, pMload->syncTerm);
598
    pObj->syncState = pMload->syncState;
2,252✔
599
    pObj->syncTerm = pMload->syncTerm;
2,252✔
600
    pObj->syncRestore = pMload->syncRestore;
2,252✔
601
    pObj->roleTimeMs = pMload->roleTimeMs;
2,252✔
602
    stateChanged = true;
2,252✔
603
  }
604
  return stateChanged;
74,203✔
605
}
606

607
extern char   *tsMonFwUri;
608
extern char   *tsMonSlowLogUri;
609
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
5,778✔
610
  SMnode    *pMnode = pReq->info.node;
5,778✔
611
  SStatisReq statisReq = {0};
5,778✔
612
  int32_t    code = -1;
5,778✔
613

614
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
5,778!
615

616
  if (tsMonitorLogProtocol) {
5,778!
617
    mInfo("process statis req,\n %s", statisReq.pCont);
×
618
  }
619

620
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
5,778✔
621
    monSendContent(statisReq.pCont, tsMonFwUri);
4,577✔
622
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
1,201!
623
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
1,201✔
624
  }
625

626
  tFreeSStatisReq(&statisReq);
5,778✔
627
  return 0;
5,778✔
628
}
629

630
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
57,315✔
631
  mTrace("process audit req:%p", pReq);
57,315✔
632
  if (tsEnableAudit && tsEnableAuditDelete) {
57,315!
633
    SMnode   *pMnode = pReq->info.node;
57,314✔
634
    SAuditReq auditReq = {0};
57,314✔
635

636
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
57,314!
637

638
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
57,313✔
639

640
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
57,313✔
641
                   auditReq.sqlLen);
642

643
    tFreeSAuditReq(&auditReq);
57,314✔
644
  }
645
  return 0;
57,315✔
646
}
647

648
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
4,438✔
649
  int32_t       code = 0, lino = 0;
4,438✔
650
  SDnodeInfoReq infoReq = {0};
4,438✔
651
  int32_t       contLen = 0;
4,438✔
652
  void         *pReq = NULL;
4,438✔
653

654
  infoReq.dnodeId = pDnode->id;
4,438✔
655
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
4,438✔
656

657
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
4,438!
658
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
659
  }
660
  pReq = rpcMallocCont(contLen);
4,438✔
661
  if (pReq == NULL) {
4,438!
662
    TAOS_RETURN(terrno);
×
663
  }
664

665
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
4,438!
666
    code = contLen;
×
667
    goto _exit;
×
668
  }
669

670
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
4,438✔
671
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
4,438!
672
_exit:
4,438✔
673
  if (code < 0) {
4,438!
674
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
675
  }
676
  TAOS_RETURN(code);
4,438✔
677
}
678

679
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
4,432✔
680
  int32_t       code = 0, lino = 0;
4,432✔
681
  SMnode       *pMnode = pReq->info.node;
4,432✔
682
  SDnodeInfoReq infoReq = {0};
4,432✔
683
  SDnodeObj    *pDnode = NULL;
4,432✔
684
  STrans       *pTrans = NULL;
4,432✔
685
  SSdbRaw      *pCommitRaw = NULL;
4,432✔
686

687
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
4,432!
688

689
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
4,432✔
690
  if (pDnode == NULL) {
4,432!
UNCOV
691
    TAOS_CHECK_EXIT(terrno);
×
692
  }
693

694
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
4,432✔
695
  if (pTrans == NULL) {
4,432!
696
    TAOS_CHECK_EXIT(terrno);
×
697
  }
698

699
  pDnode->updateTime = taosGetTimestampMs();
4,432✔
700

701
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
4,432!
702
    TAOS_CHECK_EXIT(terrno);
×
703
  }
704
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
4,432!
705
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
706
    TAOS_CHECK_EXIT(code);
×
707
  }
708
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
4,432!
709
  pCommitRaw = NULL;
4,432✔
710

711
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
4,432✔
712
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
2!
713
    TAOS_CHECK_EXIT(code);
2!
714
  }
715

716
_exit:
4,430✔
717
  mndReleaseDnode(pMnode, pDnode);
4,432✔
718
  if (code != 0) {
4,432✔
719
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
2!
720
  }
721
  mndTransDrop(pTrans);
4,432✔
722
  sdbFreeRaw(pCommitRaw);
4,432✔
723
  TAOS_RETURN(code);
4,432✔
724
}
725

726
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
113,054✔
727
  SMnode    *pMnode = pReq->info.node;
113,054✔
728
  SStatusReq statusReq = {0};
113,054✔
729
  SDnodeObj *pDnode = NULL;
113,054✔
730
  int32_t    code = -1;
113,054✔
731

732
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
113,054!
733

734
  int64_t clusterid = mndGetClusterId(pMnode);
113,054✔
735
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
113,054✔
736
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
60✔
737
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
60!
738
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
739
    goto _OVER;
60✔
740
  }
741

742
  if (statusReq.dnodeId == 0) {
112,994✔
743
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
3,247✔
744
    if (pDnode == NULL) {
3,247✔
745
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
634!
746
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
634✔
747
      if (terrno != 0) code = terrno;
634!
748
      goto _OVER;
634✔
749
    }
750
  } else {
751
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
109,747✔
752
    if (pDnode == NULL) {
109,747✔
753
      int32_t err = terrno;
367✔
754
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
367✔
755
      if (pDnode != NULL) {
367✔
756
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
6✔
757
        terrno = err;
6✔
758
        goto _OVER;
6✔
759
      }
760

761
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
361!
762
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
361✔
763
        terrno = err;
83✔
764
        goto _OVER;
83✔
765
      } else {
766
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
278✔
767
        if (pDnode == NULL) goto _OVER;
278!
768
      }
769
    }
770
  }
771

772
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
112,271✔
773

774
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
112,271✔
775
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
112,271✔
776
  int64_t curMs = taosGetTimestampMs();
112,271✔
777
  bool    online = mndIsDnodeOnline(pDnode, curMs);
112,271✔
778
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
112,271✔
779
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
112,271✔
780
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
112,271✔
781
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
112,271✔
782
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
112,271✔
783
  bool    analVerChanged = (analVer != statusReq.analVer);
112,271✔
784
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
109,409✔
785
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
221,680!
786
  const STraceId *trace = &pReq->info.traceId;
112,271✔
787
  char            timestamp[TD_TIME_STR_LEN] = {0};
112,271✔
788
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
112,271✔
789
  mGTrace(
112,271!
790
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
791
      "timestamp:%s",
792
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
793

794
  if (reboot) {
112,271✔
795
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
2,886✔
796
  }
797

798
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
819,757✔
799
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
707,486✔
800

801
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
707,486✔
802
    if (pVgroup != NULL) {
707,486✔
803
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
704,229!
804
        pVgroup->cacheUsage = pVload->cacheUsage;
658,579✔
805
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
658,579✔
806
        pVgroup->numOfTables = pVload->numOfTables;
658,579✔
807
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
658,579✔
808
        pVgroup->totalStorage = pVload->totalStorage;
658,579✔
809
        pVgroup->compStorage = pVload->compStorage;
658,579✔
810
        pVgroup->pointsWritten = pVload->pointsWritten;
658,579✔
811
      }
812
      bool stateChanged = false;
704,229✔
813
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
765,964✔
814
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
765,878✔
815
        if (pGid->dnodeId == statusReq.dnodeId) {
765,878✔
816
          if (pVload->startTimeMs == 0) {
704,143!
817
            pVload->startTimeMs = statusReq.rebootTime;
×
818
          }
819
          if (pVload->roleTimeMs == 0) {
704,143!
820
            pVload->roleTimeMs = statusReq.rebootTime;
×
821
          }
822
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
704,143✔
823
          break;
704,143✔
824
        }
825
      }
826
      if (stateChanged) {
704,229✔
827
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
16,171✔
828
        if (pDb != NULL && pDb->stateTs != curMs) {
16,171✔
829
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
9,632!
830
                pDb->stateTs, curMs);
831
          pDb->stateTs = curMs;
9,632✔
832
        }
833
        mndReleaseDb(pMnode, pDb);
16,171✔
834
      }
835
    }
836

837
    mndReleaseVgroup(pMnode, pVgroup);
707,486✔
838
  }
839

840
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
112,271✔
841
  if (pObj != NULL) {
112,271✔
842
    if (statusReq.mload.roleTimeMs == 0) {
74,203✔
843
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
1,987✔
844
    }
845
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
74,203✔
846
    mndReleaseMnode(pMnode, pObj);
74,203✔
847
  }
848

849
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
112,271✔
850
  if (pQnode != NULL) {
112,271✔
851
    pQnode->load = statusReq.qload;
40,777✔
852
    mndReleaseQnode(pMnode, pQnode);
40,777✔
853
  }
854

855
  if (needCheck) {
112,271✔
856
    if (statusReq.sver != tsVersion) {
10,058!
857
      if (pDnode != NULL) {
×
858
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
859
      }
860
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
861
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
862
      goto _OVER;
×
863
    }
864

865
    if (statusReq.dnodeId == 0) {
10,058✔
866
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
2,613!
867
    } else {
868
      if (statusReq.clusterId != pMnode->clusterId) {
7,445!
869
        if (pDnode != NULL) {
×
870
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
871
        }
872
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
873
               pMnode->clusterId);
874
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
875
        goto _OVER;
×
876
      }
877
    }
878

879
    // Verify whether the cluster parameters are consistent when status change from offline to ready
880
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
10,058✔
881
    if (pDnode->offlineReason != 0) {
10,058!
882
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
883
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
884
      goto _OVER;
×
885
    }
886

887
    if (!online) {
10,058✔
888
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
2,862!
889
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
890
    } else {
891
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
7,196!
892
            statusReq.dnodeVer, dnodeVer, reboot);
893
    }
894

895
    pDnode->rebootTime = statusReq.rebootTime;
10,058✔
896
    pDnode->numOfCores = statusReq.numOfCores;
10,058✔
897
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
10,058✔
898
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
10,058✔
899
    pDnode->memAvail = statusReq.memAvail;
10,058✔
900
    pDnode->memTotal = statusReq.memTotal;
10,058✔
901
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
10,058✔
902
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
10,058✔
903
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
10,058✔
904
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
4,438✔
905
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
4,438!
906
        goto _OVER;
×
907
      }
908
    }
909

910
    SStatusRsp statusRsp = {0};
10,058✔
911
    statusRsp.statusSeq++;
10,058✔
912
    statusRsp.analVer = analVer;
10,058✔
913
    statusRsp.dnodeVer = dnodeVer;
10,058✔
914
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
10,058✔
915
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
10,058✔
916
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
10,058✔
917
    if (statusRsp.pDnodeEps == NULL) {
10,058!
918
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
919
      goto _OVER;
×
920
    }
921

922
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
10,058✔
923
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
10,058✔
924

925
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
10,058✔
926
    void   *pHead = rpcMallocCont(contLen);
10,058✔
927
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
10,058✔
928
    taosArrayDestroy(statusRsp.pDnodeEps);
10,058✔
929
    if (contLen < 0) {
10,058!
930
      code = contLen;
×
931
      goto _OVER;
×
932
    }
933

934
    pReq->info.rspLen = contLen;
10,058✔
935
    pReq->info.rsp = pHead;
10,058✔
936
  }
937

938
  pDnode->accessTimes++;
112,271✔
939
  pDnode->lastAccessTime = curMs;
112,271✔
940
  code = 0;
112,271✔
941

942
_OVER:
113,054✔
943
  mndReleaseDnode(pMnode, pDnode);
113,054✔
944
  taosArrayDestroy(statusReq.pVloads);
113,054✔
945
  return mndUpdClusterInfo(pReq);
113,054✔
946
}
947

948
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
949
  SMnode    *pMnode = pReq->info.node;
×
950
  SNotifyReq notifyReq = {0};
×
951
  int32_t    code = 0;
×
952

953
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
954
    terrno = code;
×
955
    goto _OVER;
×
956
  }
957

958
  int64_t clusterid = mndGetClusterId(pMnode);
×
959
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
960
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
961
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
962
          notifyReq.clusterId, clusterid, tstrerror(code));
963
    goto _OVER;
×
964
  }
965

966
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
967
  for (int32_t v = 0; v < nVgroup; ++v) {
×
968
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
969

970
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
971
    if (pVgroup != NULL) {
×
972
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
973
      mndReleaseVgroup(pMnode, pVgroup);
×
974
    }
975
  }
976
  code = mndUpdClusterInfo(pReq);
×
977
_OVER:
×
978
  tFreeSNotifyReq(&notifyReq);
×
979
  return code;
×
980
}
981

982
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
491✔
983
  int32_t  code = -1;
491✔
984
  SSdbRaw *pRaw = NULL;
491✔
985
  STrans  *pTrans = NULL;
491✔
986

987
  SDnodeObj dnodeObj = {0};
491✔
988
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
491✔
989
  dnodeObj.createdTime = taosGetTimestampMs();
491✔
990
  dnodeObj.updateTime = dnodeObj.createdTime;
491✔
991
  dnodeObj.port = pCreate->port;
491✔
992
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
491✔
993
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
491✔
994

995
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
491✔
996
  if (pTrans == NULL) {
491!
997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
998
    if (terrno != 0) code = terrno;
×
999
    goto _OVER;
×
1000
  }
1001
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
491!
1002
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
491!
1003

1004
  pRaw = mndDnodeActionEncode(&dnodeObj);
491✔
1005
  if (pRaw == NULL) {
491!
1006
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1007
    if (terrno != 0) code = terrno;
×
1008
    goto _OVER;
×
1009
  }
1010
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
491!
1011
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
491!
1012
  pRaw = NULL;
491✔
1013

1014
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
491!
1015
  code = 0;
491✔
1016

1017
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
491✔
1018
                                   1);  // TODO: check the return value
1019
_OVER:
491✔
1020
  mndTransDrop(pTrans);
491✔
1021
  sdbFreeRaw(pRaw);
491✔
1022
  return code;
491✔
1023
}
1024

1025
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
7,557✔
1026
  SMnode       *pMnode = pReq->info.node;
7,557✔
1027
  SSdb         *pSdb = pMnode->pSdb;
7,557✔
1028
  SDnodeObj    *pObj = NULL;
7,557✔
1029
  void         *pIter = NULL;
7,557✔
1030
  SDnodeListRsp rsp = {0};
7,557✔
1031
  int32_t       code = -1;
7,557✔
1032

1033
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
7,557✔
1034
  if (NULL == rsp.dnodeList) {
7,557!
1035
    mError("failed to alloc epSet while process dnode list req");
×
1036
    code = terrno;
×
1037
    goto _OVER;
×
1038
  }
1039

1040
  while (1) {
7,893✔
1041
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
15,450✔
1042
    if (pIter == NULL) break;
15,450✔
1043

1044
    SDNodeAddr dnodeAddr = {0};
7,893✔
1045
    dnodeAddr.nodeId = pObj->id;
7,893✔
1046
    dnodeAddr.epSet.numOfEps = 1;
7,893✔
1047
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
7,893✔
1048
    dnodeAddr.epSet.eps[0].port = pObj->port;
7,893✔
1049

1050
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
15,786!
1051
      if (terrno != 0) code = terrno;
×
1052
      sdbRelease(pSdb, pObj);
×
1053
      sdbCancelFetch(pSdb, pIter);
×
1054
      goto _OVER;
×
1055
    }
1056

1057
    sdbRelease(pSdb, pObj);
7,893✔
1058
  }
1059

1060
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
7,557✔
1061
  void   *pRsp = rpcMallocCont(rspLen);
7,557✔
1062
  if (pRsp == NULL) {
7,557!
1063
    code = terrno;
×
1064
    goto _OVER;
×
1065
  }
1066

1067
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
7,557!
1068
    code = rspLen;
×
1069
    goto _OVER;
×
1070
  }
1071

1072
  pReq->info.rspLen = rspLen;
7,557✔
1073
  pReq->info.rsp = pRsp;
7,557✔
1074
  code = 0;
7,557✔
1075

1076
_OVER:
7,557✔
1077

1078
  if (code != 0) {
7,557!
1079
    mError("failed to get dnode list since %s", tstrerror(code));
×
1080
  }
1081

1082
  tFreeSDnodeListRsp(&rsp);
7,557✔
1083

1084
  TAOS_RETURN(code);
7,557✔
1085
}
1086

1087
void getSlowLogScopeString(int32_t scope, char *result) {
1,445✔
1088
  if (scope == SLOW_LOG_TYPE_NULL) {
1,445!
1089
    (void)strncat(result, "NONE", 64);
×
1090
    return;
×
1091
  }
1092
  while (scope > 0) {
5,770✔
1093
    if (scope & SLOW_LOG_TYPE_QUERY) {
4,325✔
1094
      (void)strncat(result, "QUERY", 64);
1,445✔
1095
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,445✔
1096
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
2,880✔
1097
      (void)strncat(result, "INSERT", 64);
1,440✔
1098
      scope &= ~SLOW_LOG_TYPE_INSERT;
1,440✔
1099
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
1,440!
1100
      (void)strncat(result, "OTHERS", 64);
1,440✔
1101
      scope &= ~SLOW_LOG_TYPE_OTHERS;
1,440✔
1102
    } else {
1103
      (void)printf("invalid slow log scope:%d", scope);
×
1104
      return;
×
1105
    }
1106

1107
    if (scope > 0) {
4,325✔
1108
      (void)strncat(result, "|", 64);
2,878✔
1109
    }
1110
  }
1111
}
1112

1113
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
492✔
1114
  SMnode         *pMnode = pReq->info.node;
492✔
1115
  int32_t         code = -1;
492✔
1116
  SDnodeObj      *pDnode = NULL;
492✔
1117
  SCreateDnodeReq createReq = {0};
492✔
1118

1119
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
492!
1120
    goto _OVER;
×
1121
  }
1122

1123
  TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
492!
1124

1125
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
492!
1126
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
492✔
1127

1128
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
491!
1129
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1130
    goto _OVER;
×
1131
  }
1132

1133
  char ep[TSDB_EP_LEN];
1134
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
491✔
1135
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
491✔
1136
  if (pDnode != NULL) {
491!
1137
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1138
    goto _OVER;
×
1139
  }
1140

1141
  code = mndCreateDnode(pMnode, pReq, &createReq);
491✔
1142
  if (code == 0) {
491!
1143
    code = TSDB_CODE_ACTION_IN_PROGRESS;
491✔
1144
    tsGrantHBInterval = 5;
491✔
1145
  }
1146

1147
  char obj[200] = {0};
491✔
1148
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
491✔
1149

1150
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
491✔
1151

1152
_OVER:
492✔
1153
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
492!
1154
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
1!
1155
  }
1156

1157
  mndReleaseDnode(pMnode, pDnode);
492✔
1158
  tFreeSCreateDnodeReq(&createReq);
492✔
1159
  TAOS_RETURN(code);
492✔
1160
}
1161

1162
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1163

1164
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
7✔
1165

1166
#ifndef TD_ENTERPRISE
1167
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1168
#endif
1169

1170
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
26✔
1171
                            SSnodeObj *pSObj, int32_t numOfVnodes, bool force, bool unsafe) {
1172
  int32_t  code = -1;
26✔
1173
  SSdbRaw *pRaw = NULL;
26✔
1174
  STrans  *pTrans = NULL;
26✔
1175
  int32_t  lino = 0;
26✔
1176

1177
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
26✔
1178
  if (pTrans == NULL) {
26!
1179
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1180
    if (terrno != 0) code = terrno;
×
UNCOV
1181
    goto _OVER;
×
1182
  }
1183
  mndTransSetGroupParallel(pTrans);
26✔
1184
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
26!
1185
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
26!
1186

1187
  pRaw = mndDnodeActionEncode(pDnode);
26✔
1188
  if (pRaw == NULL) {
26!
1189
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1190
    if (terrno != 0) code = terrno;
×
UNCOV
1191
    goto _OVER;
×
1192
  }
1193
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
26!
1194
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
26!
1195
  pRaw = NULL;
26✔
1196

1197
  pRaw = mndDnodeActionEncode(pDnode);
26✔
1198
  if (pRaw == NULL) {
26!
1199
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1200
    if (terrno != 0) code = terrno;
×
UNCOV
1201
    goto _OVER;
×
1202
  }
1203
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
26!
1204
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
26!
1205
  pRaw = NULL;
26✔
1206

1207
  if (pMObj != NULL) {
26✔
1208
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
4!
1209
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
4!
1210
  }
1211

1212
  if (pQObj != NULL) {
26✔
1213
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1214
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
3!
1215
  }
1216

1217
  if (pSObj != NULL) {
26✔
1218
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1219
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), &lino, _OVER);
3!
1220
  }
1221

1222
  if (numOfVnodes > 0) {
26✔
1223
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
16!
1224
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
16✔
1225
  }
1226

1227
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
24!
1228

1229
  code = 0;
24✔
1230

1231
_OVER:
26✔
1232
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
26!
1233
  mndTransDrop(pTrans);
26✔
1234
  sdbFreeRaw(pRaw);
26✔
1235
  TAOS_RETURN(code);
26✔
1236
}
1237

UNCOV
1238
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
UNCOV
1239
  bool       isEmpty = false;
×
UNCOV
1240
  SMnodeObj *pMObj = NULL;
×
UNCOV
1241
  SQnodeObj *pQObj = NULL;
×
UNCOV
1242
  SSnodeObj *pSObj = NULL;
×
1243

UNCOV
1244
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
UNCOV
1245
  if (pQObj) goto _OVER;
×
1246

UNCOV
1247
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
UNCOV
1248
  if (pSObj) goto _OVER;
×
1249

UNCOV
1250
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
UNCOV
1251
  if (pMObj) goto _OVER;
×
1252

UNCOV
1253
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
UNCOV
1254
  if (numOfVnodes > 0) goto _OVER;
×
1255

UNCOV
1256
  isEmpty = true;
×
UNCOV
1257
_OVER:
×
UNCOV
1258
  mndReleaseMnode(pMnode, pMObj);
×
UNCOV
1259
  mndReleaseQnode(pMnode, pQObj);
×
UNCOV
1260
  mndReleaseSnode(pMnode, pSObj);
×
UNCOV
1261
  return isEmpty;
×
1262
}
1263

1264
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
35✔
1265
  SMnode       *pMnode = pReq->info.node;
35✔
1266
  int32_t       code = -1;
35✔
1267
  SDnodeObj    *pDnode = NULL;
35✔
1268
  SMnodeObj    *pMObj = NULL;
35✔
1269
  SQnodeObj    *pQObj = NULL;
35✔
1270
  SSnodeObj    *pSObj = NULL;
35✔
1271
  SDropDnodeReq dropReq = {0};
35✔
1272

1273
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
35!
1274

1275
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
35!
1276
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1277
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
35✔
1278

1279
  bool force = dropReq.force;
34✔
1280
  if (dropReq.unsafe) {
34✔
1281
    force = true;
1✔
1282
  }
1283

1284
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
34✔
1285
  if (pDnode == NULL) {
34✔
1286
    int32_t err = terrno;
1✔
1287
    char    ep[TSDB_EP_LEN + 1] = {0};
1✔
1288
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
1✔
1289
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
1✔
1290
    if (pDnode == NULL) {
1!
1291
      code = err;
1✔
1292
      goto _OVER;
1✔
1293
    }
1294
  }
1295

1296
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
33✔
1297
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
33✔
1298
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
33✔
1299
  if (pMObj != NULL) {
33✔
1300
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
9✔
1301
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
2✔
1302
      goto _OVER;
2✔
1303
    }
1304
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
7✔
1305
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
2✔
1306
      goto _OVER;
2✔
1307
    }
1308
  }
1309

1310
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
29✔
1311
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
29✔
1312

1313
  if (isonline && force) {
29!
UNCOV
1314
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1315
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1316
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
UNCOV
1317
    goto _OVER;
×
1318
  }
1319

1320
  bool    vnodeOffline = false;
29✔
1321
  void   *pIter = NULL;
29✔
1322
  int32_t vgId = -1;
29✔
1323
  while (1) {
39✔
1324
    SVgObj *pVgroup = NULL;
68✔
1325
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
68✔
1326
    if (pIter == NULL) break;
68✔
1327

1328
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
124✔
1329
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
85✔
1330
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
34✔
1331
          vgId = pVgroup->vgId;
5✔
1332
          vnodeOffline = true;
5✔
1333
          break;
5✔
1334
        }
1335
      }
1336
    }
1337

1338
    sdbRelease(pMnode->pSdb, pVgroup);
44✔
1339

1340
    if (vnodeOffline) {
44✔
1341
      sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1342
      break;
5✔
1343
    }
1344
  }
1345

1346
  if (vnodeOffline && !force) {
29✔
1347
    code = TSDB_CODE_VND_VNODE_OFFLINE;
3✔
1348
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
3!
1349
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1350
    goto _OVER;
3✔
1351
  }
1352

1353
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
26✔
1354
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
26✔
1355

1356
  char obj1[30] = {0};
26✔
1357
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
26✔
1358

1359
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
26✔
1360

1361
_OVER:
35✔
1362
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
35!
1363
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
11!
1364
  }
1365

1366
  mndReleaseDnode(pMnode, pDnode);
35✔
1367
  mndReleaseMnode(pMnode, pMObj);
35✔
1368
  mndReleaseQnode(pMnode, pQObj);
35✔
1369
  mndReleaseSnode(pMnode, pSObj);
35✔
1370
  tFreeSDropDnodeReq(&dropReq);
35✔
1371
  TAOS_RETURN(code);
35✔
1372
}
1373

1374
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1✔
1375
  int32_t code = 0;
1✔
1376
  SMnode *pMnode = pReq->info.node;
1✔
1377
  SSdb   *pSdb = pMnode->pSdb;
1✔
1378
  void   *pIter = NULL;
1✔
1379
  int8_t  encrypting = 0;
1✔
1380

1381
  const STraceId *trace = &pReq->info.traceId;
1✔
1382

1383
  int32_t klen = strlen(pDcfgReq->value);
1✔
1384
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1!
UNCOV
1385
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
UNCOV
1386
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1387
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
UNCOV
1388
    goto _exit;
×
1389
  }
1390

1391
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1!
UNCOV
1392
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
UNCOV
1393
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
UNCOV
1394
    goto _exit;
×
1395
  }
1396

1397
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1!
UNCOV
1398
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
UNCOV
1399
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1400
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1401
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1402
    goto _exit;
×
1403
  }
1404

1405
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1✔
1406
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1✔
1407
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1✔
1408

1409
  while (1) {
1✔
1410
    SDnodeObj *pDnode = NULL;
2✔
1411
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
2✔
1412
    if (pIter == NULL) break;
2✔
1413
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
1!
UNCOV
1414
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1415
             offlineReason[pDnode->offlineReason]);
UNCOV
1416
      sdbRelease(pSdb, pDnode);
×
UNCOV
1417
      continue;
×
1418
    }
1419

1420
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
1!
1421
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
1✔
1422
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
1✔
1423
      void   *pBuf = rpcMallocCont(bufLen);
1✔
1424

1425
      if (pBuf != NULL) {
1!
1426
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
1!
UNCOV
1427
          code = bufLen;
×
UNCOV
1428
          sdbRelease(pSdb, pDnode);
×
UNCOV
1429
          goto _exit;
×
1430
        }
1431
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
1✔
1432
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
1!
1433
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
1✔
1434
        }
1435
      }
1436
    }
1437

1438
    sdbRelease(pSdb, pDnode);
1✔
1439
  }
1440

1441
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1!
UNCOV
1442
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1443
  }
1444

1445
_exit:
1✔
1446
  if (code != 0) {
1!
1447
    if (terrno == 0) terrno = code;
×
1448
  }
1449
  return code;
1✔
1450
}
1451

1452
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1✔
1453
  int32_t code = 0;
1✔
1454

1455
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1456
  SMnode       *pMnode = pReq->info.node;
1✔
1457
  SMCfgDnodeReq cfgReq = {0};
1✔
1458
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1!
1459

1460
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1!
UNCOV
1461
    tFreeSMCfgDnodeReq(&cfgReq);
×
UNCOV
1462
    TAOS_RETURN(code);
×
1463
  }
1464
  const STraceId *trace = &pReq->info.traceId;
1✔
1465
  SDCfgDnodeReq   dcfgReq = {0};
1✔
1466
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1!
1467
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
1✔
1468
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
1✔
1469
    tFreeSMCfgDnodeReq(&cfgReq);
1✔
1470
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1✔
1471
  } else {
UNCOV
1472
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
UNCOV
1473
    tFreeSMCfgDnodeReq(&cfgReq);
×
UNCOV
1474
    TAOS_RETURN(code);
×
1475
  }
1476

1477
#else
1478
  TAOS_RETURN(code);
1479
#endif
1480
}
1481

1482
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
1✔
1483
  SMnode *pMnode = pRsp->info.node;
1✔
1484
  int16_t nSuccess = 0;
1✔
1485
  int16_t nFailed = 0;
1✔
1486

1487
  if (0 == pRsp->code) {
1!
1488
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
1✔
1489
  } else {
UNCOV
1490
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1491
  }
1492

1493
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
1✔
1494
  bool    finished = nSuccess + nFailed >= nReq;
1✔
1495

1496
  if (finished) {
1!
1497
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1✔
1498
  }
1499

1500
  const STraceId *trace = &pRsp->info.traceId;
1✔
1501
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
1!
1502
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1503

1504
  return 0;
1✔
1505
}
1506

1507
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,445✔
1508
  SMnode *pMnode = pReq->info.node;
1,445✔
1509
  int32_t totalRows = 0;
1,445✔
1510
  int32_t numOfRows = 0;
1,445✔
1511
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,445✔
1512
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,445✔
1513
  char   *pWrite = NULL;
1,445✔
1514
  int32_t cols = 0;
1,445✔
1515
  int32_t code = 0;
1,445✔
1516
  int32_t lino = 0;
1,445✔
1517

1518
  cfgOpts[totalRows] = "statusInterval";
1,445✔
1519
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
1,445✔
1520
  totalRows++;
1,445✔
1521

1522
  cfgOpts[totalRows] = "timezone";
1,445✔
1523
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,445✔
1524
  totalRows++;
1,445✔
1525

1526
  cfgOpts[totalRows] = "locale";
1,445✔
1527
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,445✔
1528
  totalRows++;
1,445✔
1529

1530
  cfgOpts[totalRows] = "charset";
1,445✔
1531
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,445✔
1532
  totalRows++;
1,445✔
1533

1534
  cfgOpts[totalRows] = "monitor";
1,445✔
1535
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,445✔
1536
  totalRows++;
1,445✔
1537

1538
  cfgOpts[totalRows] = "monitorInterval";
1,445✔
1539
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,445✔
1540
  totalRows++;
1,445✔
1541

1542
  cfgOpts[totalRows] = "slowLogThreshold";
1,445✔
1543
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,445✔
1544
  totalRows++;
1,445✔
1545

1546
  cfgOpts[totalRows] = "slowLogMaxLen";
1,445✔
1547
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,445✔
1548
  totalRows++;
1,445✔
1549

1550
  char scopeStr[64] = {0};
1,445✔
1551
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,445✔
1552
  cfgOpts[totalRows] = "slowLogScope";
1,447✔
1553
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,447✔
1554
  totalRows++;
1,447✔
1555

1556
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,447✔
1557
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,447✔
1558

1559
  for (int32_t i = 0; i < totalRows; i++) {
14,126✔
1560
    cols = 0;
12,718✔
1561

1562
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
12,718✔
1563
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,718✔
1564
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
12,675!
1565

1566
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
12,748✔
1567
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,748✔
1568
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
12,666!
1569

1570
    numOfRows++;
12,679✔
1571
  }
1572

1573
_OVER:
1,408✔
1574
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,408!
1575
  pShow->numOfRows += numOfRows;
1,441✔
1576
  return numOfRows;
1,441✔
1577
}
1578

UNCOV
1579
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1580

1581
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7,587✔
1582
  SMnode    *pMnode = pReq->info.node;
7,587✔
1583
  SSdb      *pSdb = pMnode->pSdb;
7,587✔
1584
  int32_t    numOfRows = 0;
7,587✔
1585
  int32_t    cols = 0;
7,587✔
1586
  ESdbStatus objStatus = 0;
7,587✔
1587
  SDnodeObj *pDnode = NULL;
7,587✔
1588
  int64_t    curMs = taosGetTimestampMs();
7,587✔
1589
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1590
  int32_t    code = 0;
7,587✔
1591
  int32_t    lino = 0;
7,587✔
1592

1593
  while (numOfRows < rows) {
17,908✔
1594
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
17,904✔
1595
    if (pShow->pIter == NULL) break;
17,916✔
1596
    bool online = mndIsDnodeOnline(pDnode, curMs);
10,320✔
1597

1598
    cols = 0;
10,316✔
1599

1600
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,316✔
1601
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
10,307!
1602

1603
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
10,309✔
1604

1605
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,309✔
1606
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
10,300!
1607

1608
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,301✔
1609
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
10,293✔
1610
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
10,321!
1611

1612
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,310✔
1613
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
10,303!
1614
                        &lino, _OVER);
1615

1616
    const char *status = "ready";
10,308✔
1617
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
10,308!
1618
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
10,308!
1619
    if (!online) {
10,308✔
1620
      if (objStatus == SDB_STATUS_CREATING)
524!
UNCOV
1621
        status = "creating*";
×
1622
      else if (objStatus == SDB_STATUS_DROPPING)
524!
UNCOV
1623
        status = "dropping*";
×
1624
      else
1625
        status = "offline";
524✔
1626
    }
1627

1628
    STR_TO_VARSTR(buf, status);
10,308✔
1629
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,308✔
1630
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
10,301!
1631

1632
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,315✔
1633
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
10,311!
1634
                        _OVER);
1635

1636
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,306✔
1637
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
10,303!
1638
                        _OVER);
1639

1640
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
10,306!
1641
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
10,321✔
1642

1643
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,321✔
1644
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
10,312!
1645
    taosMemoryFreeClear(b);
10,311!
1646

1647
#ifdef TD_ENTERPRISE
1648
    STR_TO_VARSTR(buf, pDnode->machineId);
10,315✔
1649
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,315✔
1650
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
10,306!
1651
#endif
1652

1653
    numOfRows++;
10,307✔
1654
    sdbRelease(pSdb, pDnode);
10,307✔
1655
  }
1656

1657
_OVER:
4✔
1658
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
7,600!
1659

1660
  pShow->numOfRows += numOfRows;
7,592✔
1661
  return numOfRows;
7,592✔
1662
}
1663

UNCOV
1664
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
UNCOV
1665
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1666
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
UNCOV
1667
}
×
1668

1669
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
2,930✔
1670
  SDnodeObj *pObj = NULL;
2,930✔
1671
  void      *pIter = NULL;
2,930✔
1672
  SSdb      *pSdb = pMnode->pSdb;
2,930✔
1673
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
2,930✔
1674
  while (1) {
2,614✔
1675
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
5,544✔
1676
    if (pIter == NULL) break;
5,544✔
1677

1678
    char *fqdn = taosStrdup(pObj->fqdn);
2,614!
1679
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
2,614!
UNCOV
1680
      mError("failed to fqdn into array, but continue at this time");
×
1681
    }
1682
    sdbRelease(pSdb, pObj);
2,614✔
1683
  }
1684
  return fqdns;
2,930✔
1685
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc