• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

taosdata / TDengine / #4118

17 May 2025 06:43AM UTC coverage: 62.797% (+0.7%) from 62.054%
#4118

push

travis-ci

web-flow
Merge pull request #31115 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

156841 of 318088 branches covered (49.31%)

Branch coverage included in aggregate %.

176 of 225 new or added lines in 20 files covered. (78.22%)

2989 existing lines in 163 files now uncovered.

242067 of 317143 relevant lines covered (76.33%)

6956088.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.9
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include <stdio.h>
18
#include "audit.h"
19
#include "mndCluster.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "taos_monitor.h"
31
#include "tconfig.h"
32
#include "tjson.h"
33
#include "tmisce.h"
34
#include "tunit.h"
35

36
#define TSDB_DNODE_VER_NUMBER   2
37
#define TSDB_DNODE_RESERVE_SIZE 40
38

39
static const char *offlineReason[] = {
40
    "",
41
    "status msg timeout",
42
    "status not received",
43
    "version not match",
44
    "dnodeId not match",
45
    "clusterId not match",
46
    "statusInterval not match",
47
    "timezone not match",
48
    "locale not match",
49
    "charset not match",
50
    "ttlChangeOnWrite not match",
51
    "enableWhiteList not match",
52
    "encryptionKey not match",
53
    "monitor not match",
54
    "monitor switch not match",
55
    "monitor interval not match",
56
    "monitor slow log threshold not match",
57
    "monitor slow log sql max len not match",
58
    "monitor slow log scope not match",
59
    "unknown",
60
};
61

62
enum {
63
  DND_ACTIVE_CODE,
64
  DND_CONN_ACTIVE_CODE,
65
};
66

67
enum {
68
  DND_CREATE,
69
  DND_ADD,
70
  DND_DROP,
71
};
72

73
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
74
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
75
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
76
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
77
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
78
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
79
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
80

81
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
82
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
84
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
85
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
86
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
87
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
88
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
89
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
90
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
91

92
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
93
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
94
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
96

97
#ifdef _GRANT
98
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
99
#else
100
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
101
#endif
102

103
int32_t mndInitDnode(SMnode *pMnode) {
2,189✔
104
  SSdbTable table = {
2,189✔
105
      .sdbType = SDB_DNODE,
106
      .keyType = SDB_KEY_INT32,
107
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
108
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
109
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
110
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
111
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
112
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
113
  };
114

115
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
2,189✔
116
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
2,189✔
117
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
2,189✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
2,189✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
2,189✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
2,189✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
2,189✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
2,189✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
2,189✔
124
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
2,189✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
2,189✔
126

127
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
2,189✔
128
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
2,189✔
129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
2,189✔
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
2,189✔
131

132
  return sdbSetTable(pMnode->pSdb, table);
2,189✔
133
}
134

135
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
136
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
137
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
138
void          mndCleanupDnode(SMnode *pMnode) {}
2,188✔
139

140
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
1,633✔
141
  int32_t  code = -1;
1,633✔
142
  SSdbRaw *pRaw = NULL;
1,633✔
143
  STrans  *pTrans = NULL;
1,633✔
144

145
  SDnodeObj dnodeObj = {0};
1,633✔
146
  dnodeObj.id = 1;
1,633✔
147
  dnodeObj.createdTime = taosGetTimestampMs();
1,633✔
148
  dnodeObj.updateTime = dnodeObj.createdTime;
1,633✔
149
  dnodeObj.port = tsServerPort;
1,633✔
150
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
1,633✔
151
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
1,633✔
152
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
1,633✔
153
  char *machineId = NULL;
1,633✔
154
  code = tGetMachineId(&machineId);
1,633✔
155
  if (machineId) {
1,633!
156
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
1,633✔
157
    taosMemoryFreeClear(machineId);
1,633!
158
  } else {
159
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
160
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
161
    goto _OVER;
×
162
#endif
163
  }
164

165
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
1,633✔
166
  if (pTrans == NULL) {
1,633!
167
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
168
    if (terrno != 0) code = terrno;
×
169
    goto _OVER;
×
170
  }
171
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
1,633!
172

173
  pRaw = mndDnodeActionEncode(&dnodeObj);
1,633✔
174
  if (pRaw == NULL) {
1,633!
175
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    goto _OVER;
×
178
  }
179
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
1,633!
180
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
1,633!
181
  pRaw = NULL;
1,633✔
182

183
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,633!
184
  code = 0;
1,633✔
185
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
1,633✔
186
                                   1);  // TODO: check the return value
187

188
_OVER:
1,633✔
189
  mndTransDrop(pTrans);
1,633✔
190
  sdbFreeRaw(pRaw);
1,633✔
191
  return code;
1,633✔
192
}
193

194
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
12,572✔
195
  int32_t code = 0;
12,572✔
196
  int32_t lino = 0;
12,572✔
197
  terrno = TSDB_CODE_OUT_OF_MEMORY;
12,572✔
198

199
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
12,572✔
200
  if (pRaw == NULL) goto _OVER;
12,572!
201

202
  int32_t dataPos = 0;
12,572✔
203
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
12,572!
204
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
12,572!
205
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
12,572!
206
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
12,572!
207
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
12,572!
208
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
12,572!
209
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
12,572!
210
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
12,572!
211
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
12,572!
212
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
12,572!
213

214
  terrno = 0;
12,572✔
215

216
_OVER:
12,572✔
217
  if (terrno != 0) {
12,572!
218
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
219
    sdbFreeRaw(pRaw);
×
220
    return NULL;
×
221
  }
222

223
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
12,572✔
224
  return pRaw;
12,572✔
225
}
226

227
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
9,704✔
228
  int32_t code = 0;
9,704✔
229
  int32_t lino = 0;
9,704✔
230
  terrno = TSDB_CODE_OUT_OF_MEMORY;
9,704✔
231
  SSdbRow   *pRow = NULL;
9,704✔
232
  SDnodeObj *pDnode = NULL;
9,704✔
233

234
  int8_t sver = 0;
9,704✔
235
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
9,704!
236
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
9,704!
237
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
238
    goto _OVER;
×
239
  }
240

241
  pRow = sdbAllocRow(sizeof(SDnodeObj));
9,704✔
242
  if (pRow == NULL) goto _OVER;
9,704!
243

244
  pDnode = sdbGetRowObj(pRow);
9,704✔
245
  if (pDnode == NULL) goto _OVER;
9,704!
246

247
  int32_t dataPos = 0;
9,704✔
248
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
9,704!
249
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
9,704!
250
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
9,704!
251
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
9,704!
252
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
9,704!
253
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
9,704!
254
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
9,704!
255
  if (sver > 1) {
9,704!
256
    int16_t keyLen = 0;
9,704✔
257
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
9,704!
258
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
9,704!
259
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
9,704!
260
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
9,704!
261
  }
262

263
  terrno = 0;
9,704✔
264
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
9,704!
265
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
266
  }
267

268
_OVER:
9,704✔
269
  if (terrno != 0) {
9,704!
270
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
271
    taosMemoryFreeClear(pRow);
×
272
    return NULL;
×
273
  }
274

275
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
9,704✔
276
  return pRow;
9,704✔
277
}
278

279
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
3,962✔
280
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
3,962✔
281
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
3,962✔
282

283
  char ep[TSDB_EP_LEN] = {0};
3,962✔
284
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
3,962✔
285
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
3,962✔
286
  return 0;
3,962✔
287
}
288

289
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
9,703✔
290
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
9,703✔
291
  return 0;
9,703✔
292
}
293

294
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
5,712✔
295
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
5,712✔
296
  pOld->updateTime = pNew->updateTime;
5,712✔
297
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
298
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
5,712✔
299
#endif
300
  return 0;
5,712✔
301
}
302

303
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
2,099,078✔
304
  SSdb      *pSdb = pMnode->pSdb;
2,099,078✔
305
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
2,099,078✔
306
  if (pDnode == NULL) {
2,099,063✔
307
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
591✔
308
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
112✔
309
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
479!
310
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
311
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
479!
312
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
479✔
313
    } else {
314
      terrno = TSDB_CODE_APP_ERROR;
×
315
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
316
    }
317
  }
318

319
  return pDnode;
2,099,063✔
320
}
321

322
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
2,102,097✔
323
  SSdb *pSdb = pMnode->pSdb;
2,102,097✔
324
  sdbRelease(pSdb, pDnode);
2,102,097✔
325
}
2,102,140✔
326

327
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
31,151✔
328
  SEpSet epSet = {0};
31,151✔
329
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
31,151✔
330
  return epSet;
31,151✔
331
}
332

333
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
334
  SEpSet     epSet = {0};
×
335
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
336
  if (!pDnode) return epSet;
×
337

338
  epSet = mndGetDnodeEpset(pDnode);
×
339

340
  mndReleaseDnode(pMnode, pDnode);
×
341
  return epSet;
×
342
}
343

344
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
3,838✔
345
  SSdb *pSdb = pMnode->pSdb;
3,838✔
346

347
  void *pIter = NULL;
3,838✔
348
  while (1) {
4,462✔
349
    SDnodeObj *pDnode = NULL;
8,300✔
350
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
8,300✔
351
    if (pIter == NULL) break;
8,300✔
352

353
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
7,073✔
354
      sdbCancelFetch(pSdb, pIter);
2,611✔
355
      return pDnode;
2,611✔
356
    }
357

358
    sdbRelease(pSdb, pDnode);
4,462✔
359
  }
360

361
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
1,227✔
362
  return NULL;
1,227✔
363
}
364

365
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
335✔
366
  SSdb *pSdb = pMnode->pSdb;
335✔
367

368
  void *pIter = NULL;
335✔
369
  while (1) {
335✔
370
    SDnodeObj *pDnode = NULL;
670✔
371
    ESdbStatus objStatus = 0;
670✔
372
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
670✔
373
    if (pIter == NULL) break;
670!
374

375
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
670✔
376
      sdbCancelFetch(pSdb, pIter);
335✔
377
      return pDnode;
335✔
378
    }
379

380
    sdbRelease(pSdb, pDnode);
335✔
381
  }
382

383
  return NULL;
×
384
}
385

386
int32_t mndGetDnodeSize(SMnode *pMnode) {
244,156✔
387
  SSdb *pSdb = pMnode->pSdb;
244,156✔
388
  return sdbGetSize(pSdb, SDB_DNODE);
244,156✔
389
}
390

391
int32_t mndGetDbSize(SMnode *pMnode) {
×
392
  SSdb *pSdb = pMnode->pSdb;
×
393
  return sdbGetSize(pSdb, SDB_DB);
×
394
}
395

396
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
273,986✔
397
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
273,986✔
398
  if (interval > 5000 * (int64_t)tsStatusInterval) {
273,986✔
399
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
8,718✔
400
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
155✔
401
    }
402
    return false;
8,718✔
403
  }
404
  return true;
265,268✔
405
}
406

407
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
9,799✔
408
  SSdb *pSdb = pMnode->pSdb;
9,799✔
409

410
  int32_t numOfEps = 0;
9,799✔
411
  void   *pIter = NULL;
9,799✔
412
  while (1) {
28,760✔
413
    SDnodeObj *pDnode = NULL;
38,559✔
414
    ESdbStatus objStatus = 0;
38,559✔
415
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
38,559✔
416
    if (pIter == NULL) break;
38,559✔
417

418
    SDnodeEp dnodeEp = {0};
28,760✔
419
    dnodeEp.id = pDnode->id;
28,760✔
420
    dnodeEp.ep.port = pDnode->port;
28,760✔
421
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
28,760✔
422
    sdbRelease(pSdb, pDnode);
28,760✔
423

424
    dnodeEp.isMnode = 0;
28,760✔
425
    if (mndIsMnode(pMnode, pDnode->id)) {
28,760✔
426
      dnodeEp.isMnode = 1;
13,746✔
427
    }
428
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
28,760!
429
      mError("failed to put ep into array, but continue at this call");
×
430
    }
431
  }
432
}
9,799✔
433

434
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
49,849✔
435
  SSdb   *pSdb = pMnode->pSdb;
49,849✔
436
  int32_t code = 0;
49,849✔
437

438
  int32_t numOfEps = 0;
49,849✔
439
  void   *pIter = NULL;
49,849✔
440
  while (1) {
213,462✔
441
    SDnodeObj *pDnode = NULL;
263,311✔
442
    ESdbStatus objStatus = 0;
263,311✔
443
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
263,311✔
444
    if (pIter == NULL) break;
263,311✔
445

446
    SDnodeInfo dInfo;
447
    dInfo.id = pDnode->id;
213,462✔
448
    dInfo.ep.port = pDnode->port;
213,462✔
449
    dInfo.offlineReason = pDnode->offlineReason;
213,462✔
450
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
213,462✔
451
    sdbRelease(pSdb, pDnode);
213,462✔
452
    if (mndIsMnode(pMnode, pDnode->id)) {
213,462✔
453
      dInfo.isMnode = 1;
69,591✔
454
    } else {
455
      dInfo.isMnode = 0;
143,871✔
456
    }
457

458
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
213,462!
459
      code = terrno;
×
460
      sdbCancelFetch(pSdb, pIter);
×
461
      break;
×
462
    }
463
  }
464
  TAOS_RETURN(code);
49,849✔
465
}
466

467
#define CHECK_MONITOR_PARA(para, err)                                                                    \
468
  if (pCfg->monitorParas.para != para) {                                                                 \
469
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
470
    terrno = err;                                                                                        \
471
    return err;                                                                                          \
472
  }
473

474
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
9,799✔
475
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
9,799!
476
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
9,799!
477
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
9,799!
478
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
9,799!
479
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
9,799!
480

481
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
9,799!
482
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
483
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
484
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
485
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
486
  }
487

488
  if (pCfg->statusInterval != tsStatusInterval) {
9,799!
489
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
490
           tsStatusInterval);
491
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
492
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
493
  }
494

495
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
9,799!
496
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
497
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
498
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
499
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
500
  }
501

502
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
9,799!
503
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
504
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
505
    return DND_REASON_LOCALE_NOT_MATCH;
×
506
  }
507

508
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
9,799!
509
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
510
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
511
    return DND_REASON_CHARSET_NOT_MATCH;
×
512
  }
513

514
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
9,799!
515
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
516
           tsTtlChangeOnWrite);
517
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
518
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
519
  }
520
  int8_t enable = tsEnableWhiteList ? 1 : 0;
9,799✔
521
  if (pCfg->enableWhiteList != enable) {
9,799!
522
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
523
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
524
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
525
  }
526

527
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
9,799!
528
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
9,799!
529
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
530
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
531
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
532
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
533
  }
534

535
  return DND_REASON_ONLINE;
9,799✔
536
}
537

538
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
77✔
539
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
77!
540
    return 0.0;
15✔
541
  }
542

543
  int64_t deltaCount = currentCount - lastCount;
62✔
544
  int64_t deltaMs = currentTimeMs - lastTimeMs;
62✔
545
  double  rate = (double)deltaCount / (double)deltaMs;
62✔
546
  return rate;
62✔
547
}
548

549
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
619,803✔
550
  bool stateChanged = false;
619,803✔
551
  bool roleChanged = pGid->syncState != pVload->syncState ||
1,855,884✔
552
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
1,225,233!
553
                     pGid->roleTimeMs != pVload->roleTimeMs;
605,430✔
554

555
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
619,803✔
556
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
168✔
557
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
91✔
558
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
77!
559
      int64_t currentTimeMs = taosGetTimestampMs();
77✔
560
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
77✔
561
                                          pGid->lastSyncAppliedIndexUpdateTime);
562

563
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
77✔
564
    }
565
  }
566

567
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
619,803✔
568
  pGid->syncCommitIndex = pVload->syncCommitIndex;
619,803✔
569
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
619,803!
570
      pGid->startTimeMs != pVload->startTimeMs) {
604,387!
571
    mInfo(
15,416!
572
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
573
        "canRead:%d, dnode:%d",
574
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
575
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
576
    pGid->syncState = pVload->syncState;
15,416✔
577
    pGid->syncTerm = pVload->syncTerm;
15,416✔
578
    pGid->syncRestore = pVload->syncRestore;
15,416✔
579
    pGid->syncCanRead = pVload->syncCanRead;
15,416✔
580
    pGid->startTimeMs = pVload->startTimeMs;
15,416✔
581
    pGid->roleTimeMs = pVload->roleTimeMs;
15,416✔
582
    stateChanged = true;
15,416✔
583
  }
584
  return stateChanged;
619,803✔
585
}
586

587
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
103,450✔
588
  bool stateChanged = false;
103,450✔
589
  bool roleChanged = pObj->syncState != pMload->syncState ||
308,203✔
590
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
204,707!
591
                     pObj->roleTimeMs != pMload->roleTimeMs;
101,257✔
592
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
103,450✔
593
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
2,251!
594
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
595
          pObj->syncTerm, pMload->syncTerm);
596
    pObj->syncState = pMload->syncState;
2,251✔
597
    pObj->syncTerm = pMload->syncTerm;
2,251✔
598
    pObj->syncRestore = pMload->syncRestore;
2,251✔
599
    pObj->roleTimeMs = pMload->roleTimeMs;
2,251✔
600
    stateChanged = true;
2,251✔
601
  }
602
  return stateChanged;
103,450✔
603
}
604

605
extern char   *tsMonFwUri;
606
extern char   *tsMonSlowLogUri;
607
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
556✔
608
  SMnode    *pMnode = pReq->info.node;
556✔
609
  SStatisReq statisReq = {0};
556✔
610
  int32_t    code = -1;
556✔
611

612
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
556!
613

614
  if (tsMonitorLogProtocol) {
556!
615
    mInfo("process statis req,\n %s", statisReq.pCont);
×
616
  }
617

618
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
556✔
619
    monSendContent(statisReq.pCont, tsMonFwUri);
492✔
620
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
64!
621
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
64✔
622
  }
623

624
  tFreeSStatisReq(&statisReq);
556✔
625
  return 0;
556✔
626
}
627

628
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
8,471✔
629
  mTrace("process audit req:%p", pReq);
8,471✔
630
  if (tsEnableAudit && tsEnableAuditDelete) {
8,471!
631
    SMnode   *pMnode = pReq->info.node;
8,471✔
632
    SAuditReq auditReq = {0};
8,471✔
633

634
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
8,471!
635

636
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
8,471✔
637

638
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
8,471✔
639
                   auditReq.sqlLen);
640

641
    tFreeSAuditReq(&auditReq);
8,471✔
642
  }
643
  return 0;
8,471✔
644
}
645

646
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
4,454✔
647
  int32_t       code = 0, lino = 0;
4,454✔
648
  SDnodeInfoReq infoReq = {0};
4,454✔
649
  int32_t       contLen = 0;
4,454✔
650
  void         *pReq = NULL;
4,454✔
651

652
  infoReq.dnodeId = pDnode->id;
4,454✔
653
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
4,454✔
654

655
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
4,454!
656
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
657
  }
658
  pReq = rpcMallocCont(contLen);
4,454✔
659
  if (pReq == NULL) {
4,454!
660
    TAOS_RETURN(terrno);
×
661
  }
662

663
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
4,454!
664
    code = contLen;
×
665
    goto _exit;
×
666
  }
667

668
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
4,454✔
669
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
4,454!
670
_exit:
4,454✔
671
  if (code < 0) {
4,454!
672
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
673
  }
674
  TAOS_RETURN(code);
4,454✔
675
}
676

677
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
4,445✔
678
  int32_t       code = 0, lino = 0;
4,445✔
679
  SMnode       *pMnode = pReq->info.node;
4,445✔
680
  SDnodeInfoReq infoReq = {0};
4,445✔
681
  SDnodeObj    *pDnode = NULL;
4,445✔
682
  STrans       *pTrans = NULL;
4,445✔
683
  SSdbRaw      *pCommitRaw = NULL;
4,445✔
684

685
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
4,445!
686

687
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
4,445✔
688
  if (pDnode == NULL) {
4,445✔
689
    TAOS_CHECK_EXIT(terrno);
1!
690
  }
691

692
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
4,444✔
693
  if (pTrans == NULL) {
4,444!
694
    TAOS_CHECK_EXIT(terrno);
×
695
  }
696

697
  pDnode->updateTime = taosGetTimestampMs();
4,444✔
698

699
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
4,444!
700
    TAOS_CHECK_EXIT(terrno);
×
701
  }
702
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
4,444!
703
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
704
    TAOS_CHECK_EXIT(code);
×
705
  }
706
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
4,444!
707
  pCommitRaw = NULL;
4,444✔
708

709
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
4,444✔
710
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
3!
711
    TAOS_CHECK_EXIT(code);
3!
712
  }
713

714
_exit:
4,441✔
715
  mndReleaseDnode(pMnode, pDnode);
4,445✔
716
  if (code != 0) {
4,445✔
717
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
4!
718
  }
719
  mndTransDrop(pTrans);
4,445✔
720
  sdbFreeRaw(pCommitRaw);
4,445✔
721
  TAOS_RETURN(code);
4,445✔
722
}
723

724
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
133,448✔
725
  SMnode    *pMnode = pReq->info.node;
133,448✔
726
  SStatusReq statusReq = {0};
133,448✔
727
  SDnodeObj *pDnode = NULL;
133,448✔
728
  int32_t    code = -1;
133,448✔
729

730
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
133,448!
731

732
  int64_t clusterid = mndGetClusterId(pMnode);
133,448✔
733
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
133,448!
UNCOV
734
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
UNCOV
735
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
736
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
UNCOV
737
    goto _OVER;
×
738
  }
739

740
  if (statusReq.dnodeId == 0) {
133,448✔
741
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
2,934✔
742
    if (pDnode == NULL) {
2,934✔
743
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
327!
744
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
327✔
745
      if (terrno != 0) code = terrno;
327!
746
      goto _OVER;
327✔
747
    }
748
  } else {
749
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
130,514✔
750
    if (pDnode == NULL) {
130,514✔
751
      int32_t err = terrno;
425✔
752
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
425✔
753
      if (pDnode != NULL) {
425✔
754
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
4✔
755
        terrno = err;
4✔
756
        goto _OVER;
4✔
757
      }
758

759
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
421!
760
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
421✔
761
        terrno = err;
86✔
762
        goto _OVER;
86✔
763
      } else {
764
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
335✔
765
        if (pDnode == NULL) goto _OVER;
335!
766
      }
767
    }
768
  }
769

770
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
133,031✔
771

772
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
133,031✔
773
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
133,031✔
774
  int64_t curMs = taosGetTimestampMs();
133,031✔
775
  bool    online = mndIsDnodeOnline(pDnode, curMs);
133,031✔
776
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
133,031✔
777
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
133,031✔
778
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
133,031✔
779
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
133,031✔
780
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
133,031✔
781
  bool    analVerChanged = (analVer != statusReq.analVer);
133,031✔
782
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
130,166!
783
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
263,197!
784
  const STraceId *trace = &pReq->info.traceId;
133,031✔
785
  char            timestamp[TD_TIME_STR_LEN] = {0};
133,031✔
786
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
133,031✔
787
  mGTrace(
133,031!
788
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
789
      "timestamp:%s",
790
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
791

792
  if (reboot) {
133,031✔
793
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
2,884✔
794
  }
795

796
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
755,816✔
797
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
622,785✔
798

799
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
622,785✔
800
    if (pVgroup != NULL) {
622,785✔
801
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
619,823!
802
        pVgroup->cacheUsage = pVload->cacheUsage;
573,950✔
803
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
573,950✔
804
        pVgroup->numOfTables = pVload->numOfTables;
573,950✔
805
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
573,950✔
806
        pVgroup->totalStorage = pVload->totalStorage;
573,950✔
807
        pVgroup->compStorage = pVload->compStorage;
573,950✔
808
        pVgroup->pointsWritten = pVload->pointsWritten;
573,950✔
809
      }
810
      bool stateChanged = false;
619,823✔
811
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
682,326✔
812
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
682,306✔
813
        if (pGid->dnodeId == statusReq.dnodeId) {
682,306✔
814
          if (pVload->startTimeMs == 0) {
619,803!
815
            pVload->startTimeMs = statusReq.rebootTime;
×
816
          }
817
          if (pVload->roleTimeMs == 0) {
619,803!
818
            pVload->roleTimeMs = statusReq.rebootTime;
×
819
          }
820
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
619,803✔
821
          break;
619,803✔
822
        }
823
      }
824
      if (stateChanged) {
619,823✔
825
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
15,416✔
826
        if (pDb != NULL && pDb->stateTs != curMs) {
15,416✔
827
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
9,140!
828
                pDb->stateTs, curMs);
829
          pDb->stateTs = curMs;
9,140✔
830
        }
831
        mndReleaseDb(pMnode, pDb);
15,416✔
832
      }
833
    }
834

835
    mndReleaseVgroup(pMnode, pVgroup);
622,785✔
836
  }
837

838
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
133,031✔
839
  if (pObj != NULL) {
133,031✔
840
    if (statusReq.mload.roleTimeMs == 0) {
103,450✔
841
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
1,976✔
842
    }
843
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
103,450✔
844
    mndReleaseMnode(pMnode, pObj);
103,450✔
845
  }
846

847
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
133,031✔
848
  if (pQnode != NULL) {
133,031✔
849
    pQnode->load = statusReq.qload;
63,374✔
850
    mndReleaseQnode(pMnode, pQnode);
63,374✔
851
  }
852

853
  if (needCheck) {
133,031✔
854
    if (statusReq.sver != tsVersion) {
9,799!
855
      if (pDnode != NULL) {
×
856
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
857
      }
858
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
859
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
860
      goto _OVER;
×
861
    }
862

863
    if (statusReq.dnodeId == 0) {
9,799✔
864
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
2,607!
865
    } else {
866
      if (statusReq.clusterId != pMnode->clusterId) {
7,192!
867
        if (pDnode != NULL) {
×
868
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
869
        }
870
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
871
               pMnode->clusterId);
872
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
873
        goto _OVER;
×
874
      }
875
    }
876

877
    // Verify whether the cluster parameters are consistent when status change from offline to ready
878
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
9,799✔
879
    if (pDnode->offlineReason != 0) {
9,799!
880
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
881
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
882
      goto _OVER;
×
883
    }
884

885
    if (!online) {
9,799✔
886
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
2,865!
887
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
888
    } else {
889
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
6,934!
890
            statusReq.dnodeVer, dnodeVer, reboot);
891
    }
892

893
    pDnode->rebootTime = statusReq.rebootTime;
9,799✔
894
    pDnode->numOfCores = statusReq.numOfCores;
9,799✔
895
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
9,799✔
896
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
9,799✔
897
    pDnode->memAvail = statusReq.memAvail;
9,799✔
898
    pDnode->memTotal = statusReq.memTotal;
9,799✔
899
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
9,799✔
900
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
9,799✔
901
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
9,799✔
902
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
4,454✔
903
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
4,454!
904
        goto _OVER;
×
905
      }
906
    }
907

908
    SStatusRsp statusRsp = {0};
9,799✔
909
    statusRsp.statusSeq++;
9,799✔
910
    statusRsp.analVer = analVer;
9,799✔
911
    statusRsp.dnodeVer = dnodeVer;
9,799✔
912
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
9,799✔
913
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
9,799✔
914
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
9,799✔
915
    if (statusRsp.pDnodeEps == NULL) {
9,799!
916
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
917
      goto _OVER;
×
918
    }
919

920
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
9,799✔
921
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
9,799✔
922

923
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
9,799✔
924
    void   *pHead = rpcMallocCont(contLen);
9,799✔
925
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
9,799✔
926
    taosArrayDestroy(statusRsp.pDnodeEps);
9,799✔
927
    if (contLen < 0) {
9,799!
928
      code = contLen;
×
929
      goto _OVER;
×
930
    }
931

932
    pReq->info.rspLen = contLen;
9,799✔
933
    pReq->info.rsp = pHead;
9,799✔
934
  }
935

936
  pDnode->accessTimes++;
133,031✔
937
  pDnode->lastAccessTime = curMs;
133,031✔
938
  code = 0;
133,031✔
939

940
_OVER:
133,448✔
941
  mndReleaseDnode(pMnode, pDnode);
133,448✔
942
  taosArrayDestroy(statusReq.pVloads);
133,448✔
943
  return mndUpdClusterInfo(pReq);
133,448✔
944
}
945

946
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
947
  SMnode    *pMnode = pReq->info.node;
×
948
  SNotifyReq notifyReq = {0};
×
949
  int32_t    code = 0;
×
950

951
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
952
    terrno = code;
×
953
    goto _OVER;
×
954
  }
955

956
  int64_t clusterid = mndGetClusterId(pMnode);
×
957
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
958
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
959
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
960
          notifyReq.clusterId, clusterid, tstrerror(code));
961
    goto _OVER;
×
962
  }
963

964
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
965
  for (int32_t v = 0; v < nVgroup; ++v) {
×
966
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
967

968
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
969
    if (pVgroup != NULL) {
×
970
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
971
      mndReleaseVgroup(pMnode, pVgroup);
×
972
    }
973
  }
974
  code = mndUpdClusterInfo(pReq);
×
975
_OVER:
×
976
  tFreeSNotifyReq(&notifyReq);
×
977
  return code;
×
978
}
979

980
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
478✔
981
  int32_t  code = -1;
478✔
982
  SSdbRaw *pRaw = NULL;
478✔
983
  STrans  *pTrans = NULL;
478✔
984

985
  SDnodeObj dnodeObj = {0};
478✔
986
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
478✔
987
  dnodeObj.createdTime = taosGetTimestampMs();
478✔
988
  dnodeObj.updateTime = dnodeObj.createdTime;
478✔
989
  dnodeObj.port = pCreate->port;
478✔
990
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
478✔
991
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
478✔
992

993
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
478✔
994
  if (pTrans == NULL) {
478!
995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
996
    if (terrno != 0) code = terrno;
×
997
    goto _OVER;
×
998
  }
999
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
478!
1000
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
478!
1001

1002
  pRaw = mndDnodeActionEncode(&dnodeObj);
478✔
1003
  if (pRaw == NULL) {
478!
1004
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1005
    if (terrno != 0) code = terrno;
×
1006
    goto _OVER;
×
1007
  }
1008
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
478!
1009
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
478!
1010
  pRaw = NULL;
478✔
1011

1012
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
478!
1013
  code = 0;
478✔
1014

1015
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
478✔
1016
                                   1);  // TODO: check the return value
1017
_OVER:
478✔
1018
  mndTransDrop(pTrans);
478✔
1019
  sdbFreeRaw(pRaw);
478✔
1020
  return code;
478✔
1021
}
1022

1023
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
212✔
1024
  SMnode       *pMnode = pReq->info.node;
212✔
1025
  SSdb         *pSdb = pMnode->pSdb;
212✔
1026
  SDnodeObj    *pObj = NULL;
212✔
1027
  void         *pIter = NULL;
212✔
1028
  SDnodeListRsp rsp = {0};
212✔
1029
  int32_t       code = -1;
212✔
1030

1031
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
212✔
1032
  if (NULL == rsp.dnodeList) {
212!
1033
    mError("failed to alloc epSet while process dnode list req");
×
1034
    code = terrno;
×
1035
    goto _OVER;
×
1036
  }
1037

1038
  while (1) {
548✔
1039
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
760✔
1040
    if (pIter == NULL) break;
760✔
1041

1042
    SDNodeAddr dnodeAddr = {0};
548✔
1043
    dnodeAddr.nodeId = pObj->id;
548✔
1044
    dnodeAddr.epSet.numOfEps = 1;
548✔
1045
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
548✔
1046
    dnodeAddr.epSet.eps[0].port = pObj->port;
548✔
1047

1048
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
1,096!
1049
      if (terrno != 0) code = terrno;
×
1050
      sdbRelease(pSdb, pObj);
×
1051
      sdbCancelFetch(pSdb, pIter);
×
1052
      goto _OVER;
×
1053
    }
1054

1055
    sdbRelease(pSdb, pObj);
548✔
1056
  }
1057

1058
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
212✔
1059
  void   *pRsp = rpcMallocCont(rspLen);
212✔
1060
  if (pRsp == NULL) {
212!
1061
    code = terrno;
×
1062
    goto _OVER;
×
1063
  }
1064

1065
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
212!
1066
    code = rspLen;
×
1067
    goto _OVER;
×
1068
  }
1069

1070
  pReq->info.rspLen = rspLen;
212✔
1071
  pReq->info.rsp = pRsp;
212✔
1072
  code = 0;
212✔
1073

1074
_OVER:
212✔
1075

1076
  if (code != 0) {
212!
1077
    mError("failed to get dnode list since %s", tstrerror(code));
×
1078
  }
1079

1080
  tFreeSDnodeListRsp(&rsp);
212✔
1081

1082
  TAOS_RETURN(code);
212✔
1083
}
1084

1085
void getSlowLogScopeString(int32_t scope, char *result) {
7✔
1086
  if (scope == SLOW_LOG_TYPE_NULL) {
7!
1087
    (void)strncat(result, "NONE", 64);
×
1088
    return;
×
1089
  }
1090
  while (scope > 0) {
14✔
1091
    if (scope & SLOW_LOG_TYPE_QUERY) {
7!
1092
      (void)strncat(result, "QUERY", 64);
7✔
1093
      scope &= ~SLOW_LOG_TYPE_QUERY;
7✔
UNCOV
1094
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
UNCOV
1095
      (void)strncat(result, "INSERT", 64);
×
UNCOV
1096
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
UNCOV
1097
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
UNCOV
1098
      (void)strncat(result, "OTHERS", 64);
×
UNCOV
1099
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1100
    } else {
1101
      (void)printf("invalid slow log scope:%d", scope);
×
1102
      return;
×
1103
    }
1104

1105
    if (scope > 0) {
7!
UNCOV
1106
      (void)strncat(result, "|", 64);
×
1107
    }
1108
  }
1109
}
1110

1111
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
479✔
1112
  SMnode         *pMnode = pReq->info.node;
479✔
1113
  int32_t         code = -1;
479✔
1114
  SDnodeObj      *pDnode = NULL;
479✔
1115
  SCreateDnodeReq createReq = {0};
479✔
1116

1117
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
479!
1118
    goto _OVER;
×
1119
  }
1120

1121
  TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
479!
1122

1123
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
479!
1124
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
479✔
1125

1126
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
478!
1127
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1128
    goto _OVER;
×
1129
  }
1130

1131
  char ep[TSDB_EP_LEN];
1132
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
478✔
1133
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
478✔
1134
  if (pDnode != NULL) {
478!
1135
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1136
    goto _OVER;
×
1137
  }
1138

1139
  code = mndCreateDnode(pMnode, pReq, &createReq);
478✔
1140
  if (code == 0) {
478!
1141
    code = TSDB_CODE_ACTION_IN_PROGRESS;
478✔
1142
    tsGrantHBInterval = 5;
478✔
1143
  }
1144

1145
  char obj[200] = {0};
478✔
1146
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
478✔
1147

1148
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
478✔
1149

1150
_OVER:
479✔
1151
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
479!
1152
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
1!
1153
  }
1154

1155
  mndReleaseDnode(pMnode, pDnode);
479✔
1156
  tFreeSCreateDnodeReq(&createReq);
479✔
1157
  TAOS_RETURN(code);
479✔
1158
}
1159

1160
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1161

1162
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
7✔
1163

1164
#ifndef TD_ENTERPRISE
1165
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1166
#endif
1167

1168
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
26✔
1169
                            SSnodeObj *pSObj, int32_t numOfVnodes, bool force, bool unsafe) {
1170
  int32_t  code = -1;
26✔
1171
  SSdbRaw *pRaw = NULL;
26✔
1172
  STrans  *pTrans = NULL;
26✔
1173

1174
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
26✔
1175
  if (pTrans == NULL) {
26!
1176
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1177
    if (terrno != 0) code = terrno;
×
1178
    goto _OVER;
×
1179
  }
1180
  mndTransSetSerial(pTrans);
26✔
1181
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
26!
1182
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
26!
1183

1184
  pRaw = mndDnodeActionEncode(pDnode);
26✔
1185
  if (pRaw == NULL) {
26!
1186
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1187
    if (terrno != 0) code = terrno;
×
1188
    goto _OVER;
×
1189
  }
1190
  TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRaw), NULL, _OVER);
26!
1191
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), NULL, _OVER);
26!
1192
  pRaw = NULL;
26✔
1193

1194
  pRaw = mndDnodeActionEncode(pDnode);
26✔
1195
  if (pRaw == NULL) {
26!
1196
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1197
    if (terrno != 0) code = terrno;
×
1198
    goto _OVER;
×
1199
  }
1200
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
26!
1201
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), NULL, _OVER);
26!
1202
  pRaw = NULL;
26✔
1203

1204
  if (pMObj != NULL) {
26✔
1205
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
4!
1206
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), NULL, _OVER);
4!
1207
  }
1208

1209
  if (pQObj != NULL) {
26✔
1210
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1211
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), NULL, _OVER);
3!
1212
  }
1213

1214
  if (pSObj != NULL) {
26✔
1215
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
3!
1216
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), NULL, _OVER);
3!
1217
  }
1218

1219
  if (numOfVnodes > 0) {
26✔
1220
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
16!
1221
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
16✔
1222
  }
1223

1224
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
24!
1225

1226
  code = 0;
24✔
1227

1228
_OVER:
26✔
1229
  mndTransDrop(pTrans);
26✔
1230
  sdbFreeRaw(pRaw);
26✔
1231
  TAOS_RETURN(code);
26✔
1232
}
1233

1234
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
28✔
1235
  bool       isEmpty = false;
28✔
1236
  SMnodeObj *pMObj = NULL;
28✔
1237
  SQnodeObj *pQObj = NULL;
28✔
1238
  SSnodeObj *pSObj = NULL;
28✔
1239

1240
  pQObj = mndAcquireQnode(pMnode, dnodeId);
28✔
1241
  if (pQObj) goto _OVER;
28✔
1242

1243
  pSObj = mndAcquireSnode(pMnode, dnodeId);
24✔
1244
  if (pSObj) goto _OVER;
24!
1245

1246
  pMObj = mndAcquireMnode(pMnode, dnodeId);
24✔
1247
  if (pMObj) goto _OVER;
24✔
1248

1249
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
22✔
1250
  if (numOfVnodes > 0) goto _OVER;
22✔
1251

1252
  isEmpty = true;
7✔
1253
_OVER:
28✔
1254
  mndReleaseMnode(pMnode, pMObj);
28✔
1255
  mndReleaseQnode(pMnode, pQObj);
28✔
1256
  mndReleaseSnode(pMnode, pSObj);
28✔
1257
  return isEmpty;
28✔
1258
}
1259

1260
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
34✔
1261
  SMnode       *pMnode = pReq->info.node;
34✔
1262
  int32_t       code = -1;
34✔
1263
  SDnodeObj    *pDnode = NULL;
34✔
1264
  SMnodeObj    *pMObj = NULL;
34✔
1265
  SQnodeObj    *pQObj = NULL;
34✔
1266
  SSnodeObj    *pSObj = NULL;
34✔
1267
  SDropDnodeReq dropReq = {0};
34✔
1268

1269
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
34!
1270

1271
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
34!
1272
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1273
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
34✔
1274

1275
  bool force = dropReq.force;
33✔
1276
  if (dropReq.unsafe) {
33✔
1277
    force = true;
1✔
1278
  }
1279

1280
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
33✔
1281
  if (pDnode == NULL) {
33✔
1282
    int32_t err = terrno;
1✔
1283
    char    ep[TSDB_EP_LEN + 1] = {0};
1✔
1284
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
1✔
1285
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
1✔
1286
    if (pDnode == NULL) {
1!
1287
      code = err;
1✔
1288
      goto _OVER;
1✔
1289
    }
1290
  }
1291

1292
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
32✔
1293
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
32✔
1294
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
32✔
1295
  if (pMObj != NULL) {
32✔
1296
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
9✔
1297
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
2✔
1298
      goto _OVER;
2✔
1299
    }
1300
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
7✔
1301
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
2✔
1302
      goto _OVER;
2✔
1303
    }
1304
  }
1305

1306
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
28✔
1307
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
28✔
1308

1309
  if (isonline && force) {
28!
UNCOV
1310
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1311
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1312
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
UNCOV
1313
    goto _OVER;
×
1314
  }
1315

1316
  bool isEmpty = mndIsEmptyDnode(pMnode, pDnode->id);
28✔
1317
  if (!isonline && !force && !isEmpty) {
28!
1318
    code = TSDB_CODE_DNODE_OFFLINE;
2✔
1319
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
2!
1320
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1321
    goto _OVER;
2✔
1322
  }
1323

1324
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
26✔
1325
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
26✔
1326

1327
  char obj1[30] = {0};
26✔
1328
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
26✔
1329

1330
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
26✔
1331

1332
_OVER:
34✔
1333
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
34!
1334
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
10!
1335
  }
1336

1337
  mndReleaseDnode(pMnode, pDnode);
34✔
1338
  mndReleaseMnode(pMnode, pMObj);
34✔
1339
  mndReleaseQnode(pMnode, pQObj);
34✔
1340
  mndReleaseSnode(pMnode, pSObj);
34✔
1341
  tFreeSDropDnodeReq(&dropReq);
34✔
1342
  TAOS_RETURN(code);
34✔
1343
}
1344

1345
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
1✔
1346
  int32_t code = 0;
1✔
1347
  SMnode *pMnode = pReq->info.node;
1✔
1348
  SSdb   *pSdb = pMnode->pSdb;
1✔
1349
  void   *pIter = NULL;
1✔
1350
  int8_t  encrypting = 0;
1✔
1351

1352
  const STraceId *trace = &pReq->info.traceId;
1✔
1353

1354
  int32_t klen = strlen(pDcfgReq->value);
1✔
1355
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
1!
UNCOV
1356
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1357
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1358
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
UNCOV
1359
    goto _exit;
×
1360
  }
1361

1362
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
1!
1363
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
UNCOV
1364
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
UNCOV
1365
    goto _exit;
×
1366
  }
1367

1368
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
1!
1369
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
UNCOV
1370
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1371
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1372
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
UNCOV
1373
    goto _exit;
×
1374
  }
1375

1376
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
1✔
1377
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
1✔
1378
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
1✔
1379

1380
  while (1) {
1✔
1381
    SDnodeObj *pDnode = NULL;
2✔
1382
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
2✔
1383
    if (pIter == NULL) break;
2✔
1384
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
1!
1385
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1386
             offlineReason[pDnode->offlineReason]);
UNCOV
1387
      sdbRelease(pSdb, pDnode);
×
UNCOV
1388
      continue;
×
1389
    }
1390

1391
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
1!
1392
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
1✔
1393
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
1✔
1394
      void   *pBuf = rpcMallocCont(bufLen);
1✔
1395

1396
      if (pBuf != NULL) {
1!
1397
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
1!
1398
          code = bufLen;
×
UNCOV
1399
          sdbRelease(pSdb, pDnode);
×
1400
          goto _exit;
×
1401
        }
1402
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
1✔
1403
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
1!
1404
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
1✔
1405
        }
1406
      }
1407
    }
1408

1409
    sdbRelease(pSdb, pDnode);
1✔
1410
  }
1411

1412
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
1!
UNCOV
1413
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1414
  }
1415

1416
_exit:
1✔
1417
  if (code != 0) {
1!
1418
    if (terrno == 0) terrno = code;
×
1419
  }
1420
  return code;
1✔
1421
}
1422

1423
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
1✔
1424
  int32_t code = 0;
1✔
1425

1426
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1427
  SMnode       *pMnode = pReq->info.node;
1✔
1428
  SMCfgDnodeReq cfgReq = {0};
1✔
1429
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
1!
1430

1431
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
1!
UNCOV
1432
    tFreeSMCfgDnodeReq(&cfgReq);
×
1433
    TAOS_RETURN(code);
×
1434
  }
1435
  const STraceId *trace = &pReq->info.traceId;
1✔
1436
  SDCfgDnodeReq   dcfgReq = {0};
1✔
1437
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
1!
1438
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
1✔
1439
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
1✔
1440
    tFreeSMCfgDnodeReq(&cfgReq);
1✔
1441
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
1✔
1442
  } else {
1443
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
UNCOV
1444
    tFreeSMCfgDnodeReq(&cfgReq);
×
UNCOV
1445
    TAOS_RETURN(code);
×
1446
  }
1447

1448
#else
1449
  TAOS_RETURN(code);
1450
#endif
1451
}
1452

1453
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
1✔
1454
  SMnode *pMnode = pRsp->info.node;
1✔
1455
  int16_t nSuccess = 0;
1✔
1456
  int16_t nFailed = 0;
1✔
1457

1458
  if (0 == pRsp->code) {
1!
1459
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
1✔
1460
  } else {
UNCOV
1461
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1462
  }
1463

1464
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
1✔
1465
  bool    finished = nSuccess + nFailed >= nReq;
1✔
1466

1467
  if (finished) {
1!
1468
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
1✔
1469
  }
1470

1471
  const STraceId *trace = &pRsp->info.traceId;
1✔
1472
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
1!
1473
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1474

1475
  return 0;
1✔
1476
}
1477

1478
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7✔
1479
  SMnode *pMnode = pReq->info.node;
7✔
1480
  int32_t totalRows = 0;
7✔
1481
  int32_t numOfRows = 0;
7✔
1482
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
7✔
1483
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
7✔
1484
  char   *pWrite = NULL;
7✔
1485
  int32_t cols = 0;
7✔
1486
  int32_t code = 0;
7✔
1487
  int32_t lino = 0;
7✔
1488

1489
  cfgOpts[totalRows] = "statusInterval";
7✔
1490
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
7✔
1491
  totalRows++;
7✔
1492

1493
  cfgOpts[totalRows] = "timezone";
7✔
1494
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
7✔
1495
  totalRows++;
7✔
1496

1497
  cfgOpts[totalRows] = "locale";
7✔
1498
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
7✔
1499
  totalRows++;
7✔
1500

1501
  cfgOpts[totalRows] = "charset";
7✔
1502
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
7✔
1503
  totalRows++;
7✔
1504

1505
  cfgOpts[totalRows] = "monitor";
7✔
1506
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
7✔
1507
  totalRows++;
7✔
1508

1509
  cfgOpts[totalRows] = "monitorInterval";
7✔
1510
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
7✔
1511
  totalRows++;
7✔
1512

1513
  cfgOpts[totalRows] = "slowLogThreshold";
7✔
1514
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
7✔
1515
  totalRows++;
7✔
1516

1517
  cfgOpts[totalRows] = "slowLogMaxLen";
7✔
1518
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
7✔
1519
  totalRows++;
7✔
1520

1521
  char scopeStr[64] = {0};
7✔
1522
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
7✔
1523
  cfgOpts[totalRows] = "slowLogScope";
7✔
1524
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
7✔
1525
  totalRows++;
7✔
1526

1527
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1528
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
7✔
1529

1530
  for (int32_t i = 0; i < totalRows; i++) {
70✔
1531
    cols = 0;
63✔
1532

1533
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
63✔
1534
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1535
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
63!
1536

1537
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
63✔
1538
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
63✔
1539
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
63!
1540

1541
    numOfRows++;
63✔
1542
  }
1543

1544
_OVER:
7✔
1545
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
7!
1546
  pShow->numOfRows += numOfRows;
7✔
1547
  return numOfRows;
7✔
1548
}
1549

UNCOV
1550
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1551

1552
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7,573✔
1553
  SMnode    *pMnode = pReq->info.node;
7,573✔
1554
  SSdb      *pSdb = pMnode->pSdb;
7,573✔
1555
  int32_t    numOfRows = 0;
7,573✔
1556
  int32_t    cols = 0;
7,573✔
1557
  ESdbStatus objStatus = 0;
7,573✔
1558
  SDnodeObj *pDnode = NULL;
7,573✔
1559
  int64_t    curMs = taosGetTimestampMs();
7,576✔
1560
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
1561
  int32_t    code = 0;
7,576✔
1562
  int32_t    lino = 0;
7,576✔
1563

1564
  while (numOfRows < rows) {
17,821✔
1565
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
17,820✔
1566
    if (pShow->pIter == NULL) break;
17,825✔
1567
    bool online = mndIsDnodeOnline(pDnode, curMs);
10,244✔
1568

1569
    cols = 0;
10,240✔
1570

1571
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,240✔
1572
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
10,239!
1573

1574
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
10,237✔
1575

1576
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,237✔
1577
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
10,234!
1578

1579
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,238✔
1580
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
10,235✔
1581
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
10,244!
1582

1583
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,238✔
1584
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
10,234!
1585
                        &lino, _OVER);
1586

1587
    const char *status = "ready";
10,233✔
1588
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
10,233!
1589
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
10,233!
1590
    if (!online) {
10,233✔
1591
      if (objStatus == SDB_STATUS_CREATING)
524!
1592
        status = "creating*";
×
1593
      else if (objStatus == SDB_STATUS_DROPPING)
524!
UNCOV
1594
        status = "dropping*";
×
1595
      else
1596
        status = "offline";
524✔
1597
    }
1598

1599
    STR_TO_VARSTR(buf, status);
10,233✔
1600
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,233✔
1601
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
10,230!
1602

1603
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,233✔
1604
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
10,233!
1605
                        _OVER);
1606

1607
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,234✔
1608
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
10,232!
1609
                        _OVER);
1610

1611
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
10,235!
1612
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
10,245✔
1613

1614
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,245✔
1615
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
10,241!
1616
    taosMemoryFreeClear(b);
10,242!
1617

1618
#ifdef TD_ENTERPRISE
1619
    STR_TO_VARSTR(buf, pDnode->machineId);
10,244✔
1620
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,244✔
1621
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
10,236!
1622
#endif
1623

1624
    numOfRows++;
10,238✔
1625
    sdbRelease(pSdb, pDnode);
10,238✔
1626
  }
1627

1628
_OVER:
1✔
1629
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
7,582!
1630

1631
  pShow->numOfRows += numOfRows;
7,581✔
1632
  return numOfRows;
7,581✔
1633
}
1634

1635
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1636
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1637
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
UNCOV
1638
}
×
1639

1640
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
2,937✔
1641
  SDnodeObj *pObj = NULL;
2,937✔
1642
  void      *pIter = NULL;
2,937✔
1643
  SSdb      *pSdb = pMnode->pSdb;
2,937✔
1644
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
2,937✔
1645
  while (1) {
2,639✔
1646
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
5,576✔
1647
    if (pIter == NULL) break;
5,576✔
1648

1649
    char *fqdn = taosStrdup(pObj->fqdn);
2,639!
1650
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
2,639!
UNCOV
1651
      mError("failed to fqdn into array, but continue at this time");
×
1652
    }
1653
    sdbRelease(pSdb, pObj);
2,639✔
1654
  }
1655
  return fqdns;
2,937✔
1656
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc