• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.96
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include <stdio.h>
18
#include "audit.h"
19
#include "mndCluster.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "taos_monitor.h"
31
#include "tconfig.h"
32
#include "tjson.h"
33
#include "tmisce.h"
34
#include "tunit.h"
35

36
#define TSDB_DNODE_VER_NUMBER   2
37
#define TSDB_DNODE_RESERVE_SIZE 40
38

39
static const char *offlineReason[] = {
40
    "",
41
    "status msg timeout",
42
    "status not received",
43
    "version not match",
44
    "dnodeId not match",
45
    "clusterId not match",
46
    "statusInterval not match",
47
    "timezone not match",
48
    "locale not match",
49
    "charset not match",
50
    "ttlChangeOnWrite not match",
51
    "enableWhiteList not match",
52
    "encryptionKey not match",
53
    "monitor not match",
54
    "monitor switch not match",
55
    "monitor interval not match",
56
    "monitor slow log threshold not match",
57
    "monitor slow log sql max len not match",
58
    "monitor slow log scope not match",
59
    "unknown",
60
};
61

62
enum {
63
  DND_ACTIVE_CODE,
64
  DND_CONN_ACTIVE_CODE,
65
};
66

67
enum {
68
  DND_CREATE,
69
  DND_ADD,
70
  DND_DROP,
71
};
72

73
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
74
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
75
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
76
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
77
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
78
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
79
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
80

81
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
82
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
84
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
85
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
86
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
87
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
88
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
89
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
90
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
91

92
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
93
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
94
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
96

97
#ifdef _GRANT
98
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
99
#else
100
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
101
#endif
102

103
int32_t mndInitDnode(SMnode *pMnode) {
8✔
104
  SSdbTable table = {
8✔
105
      .sdbType = SDB_DNODE,
106
      .keyType = SDB_KEY_INT32,
107
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
108
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
109
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
110
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
111
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
112
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
113
  };
114

115
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
8✔
116
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
8✔
117
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
8✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
8✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
8✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
8✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
8✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
8✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
8✔
124
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
8✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
8✔
126

127
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
8✔
128
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
8✔
129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
8✔
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
8✔
131

132
  return sdbSetTable(pMnode->pSdb, table);
8✔
133
}
134

135
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
136
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
137
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
138
void          mndCleanupDnode(SMnode *pMnode) {}
8✔
139

140
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
8✔
141
  int32_t  code = -1;
8✔
142
  SSdbRaw *pRaw = NULL;
8✔
143
  STrans  *pTrans = NULL;
8✔
144

145
  SDnodeObj dnodeObj = {0};
8✔
146
  dnodeObj.id = 1;
8✔
147
  dnodeObj.createdTime = taosGetTimestampMs();
8✔
148
  dnodeObj.updateTime = dnodeObj.createdTime;
8✔
149
  dnodeObj.port = tsServerPort;
8✔
150
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
8✔
151
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
8✔
152
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
8✔
153
  char *machineId = NULL;
8✔
154
  code = tGetMachineId(&machineId);
8✔
155
  if (machineId) {
8!
156
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
8✔
157
    taosMemoryFreeClear(machineId);
8!
158
  } else {
159
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
160
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
161
    goto _OVER;
×
162
#endif
163
  }
164

165
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
8✔
166
  if (pTrans == NULL) {
8!
167
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
168
    if (terrno != 0) code = terrno;
×
169
    goto _OVER;
×
170
  }
171
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
8!
172

173
  pRaw = mndDnodeActionEncode(&dnodeObj);
8✔
174
  if (pRaw == NULL) {
8!
175
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    goto _OVER;
×
178
  }
179
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
8!
180
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
8!
181
  pRaw = NULL;
8✔
182

183
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
8!
184
  code = 0;
8✔
185
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
8✔
186
                                   1);  // TODO: check the return value
187

188
_OVER:
8✔
189
  mndTransDrop(pTrans);
8✔
190
  sdbFreeRaw(pRaw);
8✔
191
  return code;
8✔
192
}
193

194
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
33✔
195
  int32_t code = 0;
33✔
196
  int32_t lino = 0;
33✔
197
  terrno = TSDB_CODE_OUT_OF_MEMORY;
33✔
198

199
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
33✔
200
  if (pRaw == NULL) goto _OVER;
33!
201

202
  int32_t dataPos = 0;
33✔
203
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
33!
204
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
33!
205
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
33!
206
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
33!
207
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
33!
208
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
33!
209
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
33!
210
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
33!
211
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
33!
212
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
33!
213

214
  terrno = 0;
33✔
215

216
_OVER:
33✔
217
  if (terrno != 0) {
33!
218
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
219
    sdbFreeRaw(pRaw);
×
220
    return NULL;
×
221
  }
222

223
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
33!
224
  return pRaw;
33✔
225
}
226

227
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
17✔
228
  int32_t code = 0;
17✔
229
  int32_t lino = 0;
17✔
230
  terrno = TSDB_CODE_OUT_OF_MEMORY;
17✔
231
  SSdbRow   *pRow = NULL;
17✔
232
  SDnodeObj *pDnode = NULL;
17✔
233

234
  int8_t sver = 0;
17✔
235
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
17!
236
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
17!
237
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
238
    goto _OVER;
×
239
  }
240

241
  pRow = sdbAllocRow(sizeof(SDnodeObj));
17✔
242
  if (pRow == NULL) goto _OVER;
17!
243

244
  pDnode = sdbGetRowObj(pRow);
17✔
245
  if (pDnode == NULL) goto _OVER;
17!
246

247
  int32_t dataPos = 0;
17✔
248
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
17!
249
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
17!
250
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
17!
251
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
17!
252
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
17!
253
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
17!
254
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
17!
255
  if (sver > 1) {
17!
256
    int16_t keyLen = 0;
17✔
257
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
17!
258
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
17!
259
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
17!
260
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
17!
261
  }
262

263
  terrno = 0;
17✔
264
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
17!
265
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
266
  }
267

268
_OVER:
17✔
269
  if (terrno != 0) {
17!
270
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
271
    taosMemoryFreeClear(pRow);
×
272
    return NULL;
×
273
  }
274

275
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
17!
276
  return pRow;
17✔
277
}
278

279
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
8✔
280
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
8!
281
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
8✔
282

283
  char ep[TSDB_EP_LEN] = {0};
8✔
284
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
8✔
285
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
8✔
286
  return 0;
8✔
287
}
288

289
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
17✔
290
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
17!
291
  return 0;
17✔
292
}
293

294
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
9✔
295
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
9!
296
  pOld->updateTime = pNew->updateTime;
9✔
297
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
298
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
9✔
299
#endif
300
  return 0;
9✔
301
}
302

303
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
171✔
304
  SSdb      *pSdb = pMnode->pSdb;
171✔
305
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
171✔
306
  if (pDnode == NULL) {
171!
UNCOV
307
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
UNCOV
308
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
UNCOV
309
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
310
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
UNCOV
311
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
UNCOV
312
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
×
313
    } else {
314
      terrno = TSDB_CODE_APP_ERROR;
×
315
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
316
    }
317
  }
318

319
  return pDnode;
171✔
320
}
321

322
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
179✔
323
  SSdb *pSdb = pMnode->pSdb;
179✔
324
  sdbRelease(pSdb, pDnode);
179✔
325
}
179✔
326

327
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
56✔
328
  SEpSet epSet = {0};
56✔
329
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
56✔
330
  return epSet;
56✔
331
}
332

333
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
334
  SEpSet     epSet = {0};
×
335
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
336
  if (!pDnode) return epSet;
×
337

338
  epSet = mndGetDnodeEpset(pDnode);
×
339

340
  mndReleaseDnode(pMnode, pDnode);
×
341
  return epSet;
×
342
}
343

344
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
8✔
345
  SSdb *pSdb = pMnode->pSdb;
8✔
346

347
  void *pIter = NULL;
8✔
UNCOV
348
  while (1) {
×
349
    SDnodeObj *pDnode = NULL;
8✔
350
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
8✔
351
    if (pIter == NULL) break;
8!
352

353
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
8!
354
      sdbCancelFetch(pSdb, pIter);
8✔
355
      return pDnode;
8✔
356
    }
357

UNCOV
358
    sdbRelease(pSdb, pDnode);
×
359
  }
360

UNCOV
361
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
UNCOV
362
  return NULL;
×
363
}
364

UNCOV
365
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
×
UNCOV
366
  SSdb *pSdb = pMnode->pSdb;
×
367

UNCOV
368
  void *pIter = NULL;
×
UNCOV
369
  while (1) {
×
UNCOV
370
    SDnodeObj *pDnode = NULL;
×
UNCOV
371
    ESdbStatus objStatus = 0;
×
UNCOV
372
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
×
UNCOV
373
    if (pIter == NULL) break;
×
374

UNCOV
375
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
×
UNCOV
376
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
377
      return pDnode;
×
378
    }
379

UNCOV
380
    sdbRelease(pSdb, pDnode);
×
381
  }
382

383
  return NULL;
×
384
}
385

386
int32_t mndGetDnodeSize(SMnode *pMnode) {
73✔
387
  SSdb *pSdb = pMnode->pSdb;
73✔
388
  return sdbGetSize(pSdb, SDB_DNODE);
73✔
389
}
390

391
int32_t mndGetDbSize(SMnode *pMnode) {
×
392
  SSdb *pSdb = pMnode->pSdb;
×
393
  return sdbGetSize(pSdb, SDB_DB);
×
394
}
395

396
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
33✔
397
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
33✔
398
  if (interval > 5000 * (int64_t)tsStatusInterval) {
33✔
399
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
8!
UNCOV
400
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
×
401
    }
402
    return false;
8✔
403
  }
404
  return true;
25✔
405
}
406

407
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
10✔
408
  SSdb *pSdb = pMnode->pSdb;
10✔
409

410
  int32_t numOfEps = 0;
10✔
411
  void   *pIter = NULL;
10✔
412
  while (1) {
10✔
413
    SDnodeObj *pDnode = NULL;
20✔
414
    ESdbStatus objStatus = 0;
20✔
415
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
20✔
416
    if (pIter == NULL) break;
20✔
417

418
    SDnodeEp dnodeEp = {0};
10✔
419
    dnodeEp.id = pDnode->id;
10✔
420
    dnodeEp.ep.port = pDnode->port;
10✔
421
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
10✔
422
    sdbRelease(pSdb, pDnode);
10✔
423

424
    dnodeEp.isMnode = 0;
10✔
425
    if (mndIsMnode(pMnode, pDnode->id)) {
10!
426
      dnodeEp.isMnode = 1;
10✔
427
    }
428
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
10!
429
      mError("failed to put ep into array, but continue at this call");
×
430
    }
431
  }
432
}
10✔
433

434
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
9✔
435
  SSdb   *pSdb = pMnode->pSdb;
9✔
436
  int32_t code = 0;
9✔
437

438
  int32_t numOfEps = 0;
9✔
439
  void   *pIter = NULL;
9✔
440
  while (1) {
9✔
441
    SDnodeObj *pDnode = NULL;
18✔
442
    ESdbStatus objStatus = 0;
18✔
443
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
18✔
444
    if (pIter == NULL) break;
18✔
445

446
    SDnodeInfo dInfo;
447
    dInfo.id = pDnode->id;
9✔
448
    dInfo.ep.port = pDnode->port;
9✔
449
    dInfo.offlineReason = pDnode->offlineReason;
9✔
450
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
9✔
451
    sdbRelease(pSdb, pDnode);
9✔
452
    if (mndIsMnode(pMnode, pDnode->id)) {
9!
453
      dInfo.isMnode = 1;
9✔
454
    } else {
UNCOV
455
      dInfo.isMnode = 0;
×
456
    }
457

458
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
9!
459
      code = terrno;
×
460
      sdbCancelFetch(pSdb, pIter);
×
461
      break;
×
462
    }
463
  }
464
  TAOS_RETURN(code);
9✔
465
}
466

467
#define CHECK_MONITOR_PARA(para, err)                                                                    \
468
  if (pCfg->monitorParas.para != para) {                                                                 \
469
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
470
    terrno = err;                                                                                        \
471
    return err;                                                                                          \
472
  }
473

474
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
10✔
475
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
10!
476
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
10!
477
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
10!
478
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
10!
479
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
10!
480

481
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
10!
482
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
483
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
484
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
485
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
486
  }
487

488
  if (pCfg->statusInterval != tsStatusInterval) {
10!
489
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
490
           tsStatusInterval);
491
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
492
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
493
  }
494

495
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
10!
496
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
497
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
498
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
499
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
500
  }
501

502
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
10!
503
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
504
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
505
    return DND_REASON_LOCALE_NOT_MATCH;
×
506
  }
507

508
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
10!
509
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
510
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
511
    return DND_REASON_CHARSET_NOT_MATCH;
×
512
  }
513

514
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
10!
515
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
516
           tsTtlChangeOnWrite);
517
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
518
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
519
  }
520
  int8_t enable = tsEnableWhiteList ? 1 : 0;
10✔
521
  if (pCfg->enableWhiteList != enable) {
10!
522
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
523
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
524
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
525
  }
526

527
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
10!
528
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
10!
529
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
530
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
531
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
532
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
533
  }
534

535
  return DND_REASON_ONLINE;
10✔
536
}
537

UNCOV
538
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
×
UNCOV
539
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
×
UNCOV
540
    return 0.0;
×
541
  }
542

UNCOV
543
  int64_t deltaCount = currentCount - lastCount;
×
UNCOV
544
  int64_t deltaMs = currentTimeMs - lastTimeMs;
×
UNCOV
545
  double  rate = (double)deltaCount / (double)deltaMs;
×
UNCOV
546
  return rate;
×
547
}
548

549
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
5✔
550
  bool stateChanged = false;
5✔
551
  bool roleChanged = pGid->syncState != pVload->syncState ||
15✔
552
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
5!
UNCOV
553
                     pGid->roleTimeMs != pVload->roleTimeMs;
×
554

555
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
5!
UNCOV
556
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
×
UNCOV
557
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
×
UNCOV
558
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
×
UNCOV
559
      int64_t currentTimeMs = taosGetTimestampMs();
×
UNCOV
560
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
×
561
                                          pGid->lastSyncAppliedIndexUpdateTime);
562

UNCOV
563
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
×
564
    }
565
  }
566

567
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
5✔
568
  pGid->syncCommitIndex = pVload->syncCommitIndex;
5✔
569
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
5!
UNCOV
570
      pGid->startTimeMs != pVload->startTimeMs) {
×
571
    mInfo(
5!
572
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
573
        "canRead:%d, dnode:%d",
574
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
575
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
576
    pGid->syncState = pVload->syncState;
5✔
577
    pGid->syncTerm = pVload->syncTerm;
5✔
578
    pGid->syncRestore = pVload->syncRestore;
5✔
579
    pGid->syncCanRead = pVload->syncCanRead;
5✔
580
    pGid->startTimeMs = pVload->startTimeMs;
5✔
581
    pGid->roleTimeMs = pVload->roleTimeMs;
5✔
582
    stateChanged = true;
5✔
583
  }
584
  return stateChanged;
5✔
585
}
586

587
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
20✔
588
  bool stateChanged = false;
20✔
589
  bool roleChanged = pObj->syncState != pMload->syncState ||
59✔
590
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
39!
591
                     pObj->roleTimeMs != pMload->roleTimeMs;
19!
592
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
20!
593
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
1!
594
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
595
          pObj->syncTerm, pMload->syncTerm);
596
    pObj->syncState = pMload->syncState;
1✔
597
    pObj->syncTerm = pMload->syncTerm;
1✔
598
    pObj->syncRestore = pMload->syncRestore;
1✔
599
    pObj->roleTimeMs = pMload->roleTimeMs;
1✔
600
    stateChanged = true;
1✔
601
  }
602
  return stateChanged;
20✔
603
}
604

605
extern char   *tsMonFwUri;
606
extern char   *tsMonSlowLogUri;
UNCOV
607
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
×
UNCOV
608
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
609
  SStatisReq statisReq = {0};
×
UNCOV
610
  int32_t    code = -1;
×
611

UNCOV
612
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
×
613

UNCOV
614
  if (tsMonitorLogProtocol) {
×
615
    mInfo("process statis req,\n %s", statisReq.pCont);
×
616
  }
617

UNCOV
618
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
×
UNCOV
619
    monSendContent(statisReq.pCont, tsMonFwUri);
×
UNCOV
620
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
UNCOV
621
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
622
  }
623

UNCOV
624
  tFreeSStatisReq(&statisReq);
×
UNCOV
625
  return 0;
×
626
}
627

UNCOV
628
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
×
UNCOV
629
  mTrace("process audit req:%p", pReq);
×
UNCOV
630
  if (tsEnableAudit && tsEnableAuditDelete) {
×
UNCOV
631
    SMnode   *pMnode = pReq->info.node;
×
UNCOV
632
    SAuditReq auditReq = {0};
×
633

UNCOV
634
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
635

UNCOV
636
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
×
637

UNCOV
638
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
×
639
                   auditReq.sqlLen);
640

UNCOV
641
    tFreeSAuditReq(&auditReq);
×
642
  }
UNCOV
643
  return 0;
×
644
}
645

646
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
9✔
647
  int32_t       code = 0, lino = 0;
9✔
648
  SDnodeInfoReq infoReq = {0};
9✔
649
  int32_t       contLen = 0;
9✔
650
  void         *pReq = NULL;
9✔
651

652
  infoReq.dnodeId = pDnode->id;
9✔
653
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
9✔
654

655
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
9!
656
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
657
  }
658
  pReq = rpcMallocCont(contLen);
9✔
659
  if (pReq == NULL) {
9!
660
    TAOS_RETURN(terrno);
×
661
  }
662

663
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
9!
664
    code = contLen;
×
665
    goto _exit;
×
666
  }
667

668
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
9✔
669
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
9!
670
_exit:
9✔
671
  if (code < 0) {
9!
672
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
673
  }
674
  TAOS_RETURN(code);
9✔
675
}
676

677
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
9✔
678
  int32_t       code = 0, lino = 0;
9✔
679
  SMnode       *pMnode = pReq->info.node;
9✔
680
  SDnodeInfoReq infoReq = {0};
9✔
681
  SDnodeObj    *pDnode = NULL;
9✔
682
  STrans       *pTrans = NULL;
9✔
683
  SSdbRaw      *pCommitRaw = NULL;
9✔
684

685
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
9!
686

687
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
9✔
688
  if (pDnode == NULL) {
9!
UNCOV
689
    TAOS_CHECK_EXIT(terrno);
×
690
  }
691

692
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
9✔
693
  if (pTrans == NULL) {
9!
694
    TAOS_CHECK_EXIT(terrno);
×
695
  }
696

697
  pDnode->updateTime = taosGetTimestampMs();
9✔
698

699
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
9!
700
    TAOS_CHECK_EXIT(terrno);
×
701
  }
702
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
9!
703
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
704
    TAOS_CHECK_EXIT(code);
×
705
  }
706
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
9!
707
  pCommitRaw = NULL;
9✔
708

709
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
9!
UNCOV
710
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
711
    TAOS_CHECK_EXIT(code);
×
712
  }
713

714
_exit:
9✔
715
  mndReleaseDnode(pMnode, pDnode);
9✔
716
  if (code != 0) {
9!
UNCOV
717
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
718
  }
719
  mndTransDrop(pTrans);
9✔
720
  sdbFreeRaw(pCommitRaw);
9✔
721
  TAOS_RETURN(code);
9✔
722
}
723

724
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
20✔
725
  SMnode    *pMnode = pReq->info.node;
20✔
726
  SStatusReq statusReq = {0};
20✔
727
  SDnodeObj *pDnode = NULL;
20✔
728
  int32_t    code = -1;
20✔
729

730
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
20!
731

732
  int64_t clusterid = mndGetClusterId(pMnode);
20✔
733
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
20!
UNCOV
734
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
UNCOV
735
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
736
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
UNCOV
737
    goto _OVER;
×
738
  }
739

740
  if (statusReq.dnodeId == 0) {
20✔
741
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
8✔
742
    if (pDnode == NULL) {
8!
UNCOV
743
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
×
UNCOV
744
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
745
      if (terrno != 0) code = terrno;
×
UNCOV
746
      goto _OVER;
×
747
    }
748
  } else {
749
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
12✔
750
    if (pDnode == NULL) {
12!
UNCOV
751
      int32_t err = terrno;
×
UNCOV
752
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
×
UNCOV
753
      if (pDnode != NULL) {
×
UNCOV
754
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
×
UNCOV
755
        terrno = err;
×
UNCOV
756
        goto _OVER;
×
757
      }
758

UNCOV
759
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
×
UNCOV
760
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
×
UNCOV
761
        terrno = err;
×
UNCOV
762
        goto _OVER;
×
763
      } else {
UNCOV
764
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
×
UNCOV
765
        if (pDnode == NULL) goto _OVER;
×
766
      }
767
    }
768
  }
769

770
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
20✔
771

772
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
20✔
773
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
20✔
774
  int64_t curMs = taosGetTimestampMs();
20✔
775
  bool    online = mndIsDnodeOnline(pDnode, curMs);
20✔
776
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
20✔
777
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
20✔
778
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
20✔
779
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
20✔
780
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
20✔
781
  bool    analVerChanged = (analVer != statusReq.analVer);
20✔
782
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
12!
783
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
32!
784
  const STraceId *trace = &pReq->info.traceId;
20✔
785
  char            timestamp[TD_TIME_STR_LEN] = {0};
20✔
786
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
20!
787
  mGTrace(
20!
788
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
789
      "timestamp:%s",
790
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
791

792
  if (reboot) {
20✔
793
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
1✔
794
  }
795

796
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
25✔
797
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
5✔
798

799
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
5✔
800
    if (pVgroup != NULL) {
5!
801
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
5!
802
        pVgroup->cacheUsage = pVload->cacheUsage;
5✔
803
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
5✔
804
        pVgroup->numOfTables = pVload->numOfTables;
5✔
805
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
5✔
806
        pVgroup->totalStorage = pVload->totalStorage;
5✔
807
        pVgroup->compStorage = pVload->compStorage;
5✔
808
        pVgroup->pointsWritten = pVload->pointsWritten;
5✔
809
      }
810
      bool stateChanged = false;
5✔
811
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
5!
812
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
5✔
813
        if (pGid->dnodeId == statusReq.dnodeId) {
5!
814
          if (pVload->startTimeMs == 0) {
5!
815
            pVload->startTimeMs = statusReq.rebootTime;
×
816
          }
817
          if (pVload->roleTimeMs == 0) {
5!
818
            pVload->roleTimeMs = statusReq.rebootTime;
×
819
          }
820
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
5✔
821
          break;
5✔
822
        }
823
      }
824
      if (stateChanged) {
5!
825
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5✔
826
        if (pDb != NULL && pDb->stateTs != curMs) {
5!
UNCOV
827
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
×
828
                pDb->stateTs, curMs);
UNCOV
829
          pDb->stateTs = curMs;
×
830
        }
831
        mndReleaseDb(pMnode, pDb);
5✔
832
      }
833
    }
834

835
    mndReleaseVgroup(pMnode, pVgroup);
5✔
836
  }
837

838
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
20✔
839
  if (pObj != NULL) {
20!
840
    if (statusReq.mload.roleTimeMs == 0) {
20✔
841
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
8✔
842
    }
843
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
20✔
844
    mndReleaseMnode(pMnode, pObj);
20✔
845
  }
846

847
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
20✔
848
  if (pQnode != NULL) {
20!
UNCOV
849
    pQnode->load = statusReq.qload;
×
UNCOV
850
    mndReleaseQnode(pMnode, pQnode);
×
851
  }
852

853
  if (needCheck) {
20✔
854
    if (statusReq.sver != tsVersion) {
10!
855
      if (pDnode != NULL) {
×
856
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
857
      }
858
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
859
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
860
      goto _OVER;
×
861
    }
862

863
    if (statusReq.dnodeId == 0) {
10✔
864
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
8!
865
    } else {
866
      if (statusReq.clusterId != pMnode->clusterId) {
2!
867
        if (pDnode != NULL) {
×
868
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
869
        }
870
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
871
               pMnode->clusterId);
872
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
873
        goto _OVER;
×
874
      }
875
    }
876

877
    // Verify whether the cluster parameters are consistent when status change from offline to ready
878
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
10✔
879
    if (pDnode->offlineReason != 0) {
10!
880
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
881
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
882
      goto _OVER;
×
883
    }
884

885
    if (!online) {
10✔
886
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
8!
887
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
888
    } else {
889
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
2!
890
            statusReq.dnodeVer, dnodeVer, reboot);
891
    }
892

893
    pDnode->rebootTime = statusReq.rebootTime;
10✔
894
    pDnode->numOfCores = statusReq.numOfCores;
10✔
895
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
10✔
896
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
10✔
897
    pDnode->memAvail = statusReq.memAvail;
10✔
898
    pDnode->memTotal = statusReq.memTotal;
10✔
899
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
10✔
900
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
10✔
901
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
10✔
902
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
9✔
903
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
9!
904
        goto _OVER;
×
905
      }
906
    }
907

908
    SStatusRsp statusRsp = {0};
10✔
909
    statusRsp.statusSeq++;
10✔
910
    statusRsp.analVer = analVer;
10✔
911
    statusRsp.dnodeVer = dnodeVer;
10✔
912
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
10✔
913
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
10✔
914
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
10✔
915
    if (statusRsp.pDnodeEps == NULL) {
10!
916
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
917
      goto _OVER;
×
918
    }
919

920
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
10✔
921
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
10✔
922

923
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
10✔
924
    void   *pHead = rpcMallocCont(contLen);
10✔
925
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
10✔
926
    taosArrayDestroy(statusRsp.pDnodeEps);
10✔
927
    if (contLen < 0) {
10!
928
      code = contLen;
×
929
      goto _OVER;
×
930
    }
931

932
    pReq->info.rspLen = contLen;
10✔
933
    pReq->info.rsp = pHead;
10✔
934
  }
935

936
  pDnode->accessTimes++;
20✔
937
  pDnode->lastAccessTime = curMs;
20✔
938
  code = 0;
20✔
939

940
_OVER:
20✔
941
  mndReleaseDnode(pMnode, pDnode);
20✔
942
  taosArrayDestroy(statusReq.pVloads);
20✔
943
  return mndUpdClusterInfo(pReq);
20✔
944
}
945

946
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
947
  SMnode    *pMnode = pReq->info.node;
×
948
  SNotifyReq notifyReq = {0};
×
949
  int32_t    code = 0;
×
950

951
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
952
    terrno = code;
×
953
    goto _OVER;
×
954
  }
955

956
  int64_t clusterid = mndGetClusterId(pMnode);
×
957
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
958
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
959
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
960
          notifyReq.clusterId, clusterid, tstrerror(code));
961
    goto _OVER;
×
962
  }
963

964
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
965
  for (int32_t v = 0; v < nVgroup; ++v) {
×
966
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
967

968
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
969
    if (pVgroup != NULL) {
×
970
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
971
      mndReleaseVgroup(pMnode, pVgroup);
×
972
    }
973
  }
974
  code = mndUpdClusterInfo(pReq);
×
975
_OVER:
×
976
  tFreeSNotifyReq(&notifyReq);
×
977
  return code;
×
978
}
979

UNCOV
980
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
×
UNCOV
981
  int32_t  code = -1;
×
UNCOV
982
  SSdbRaw *pRaw = NULL;
×
UNCOV
983
  STrans  *pTrans = NULL;
×
984

UNCOV
985
  SDnodeObj dnodeObj = {0};
×
UNCOV
986
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
×
UNCOV
987
  dnodeObj.createdTime = taosGetTimestampMs();
×
UNCOV
988
  dnodeObj.updateTime = dnodeObj.createdTime;
×
UNCOV
989
  dnodeObj.port = pCreate->port;
×
UNCOV
990
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
×
UNCOV
991
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
×
992

UNCOV
993
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
×
UNCOV
994
  if (pTrans == NULL) {
×
995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
996
    if (terrno != 0) code = terrno;
×
997
    goto _OVER;
×
998
  }
UNCOV
999
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
×
UNCOV
1000
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1001

UNCOV
1002
  pRaw = mndDnodeActionEncode(&dnodeObj);
×
UNCOV
1003
  if (pRaw == NULL) {
×
1004
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1005
    if (terrno != 0) code = terrno;
×
1006
    goto _OVER;
×
1007
  }
UNCOV
1008
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
×
UNCOV
1009
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
×
UNCOV
1010
  pRaw = NULL;
×
1011

UNCOV
1012
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
UNCOV
1013
  code = 0;
×
1014

UNCOV
1015
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
×
1016
                                   1);  // TODO: check the return value
UNCOV
1017
_OVER:
×
UNCOV
1018
  mndTransDrop(pTrans);
×
UNCOV
1019
  sdbFreeRaw(pRaw);
×
UNCOV
1020
  return code;
×
1021
}
1022

UNCOV
1023
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
×
UNCOV
1024
  SMnode       *pMnode = pReq->info.node;
×
UNCOV
1025
  SSdb         *pSdb = pMnode->pSdb;
×
UNCOV
1026
  SDnodeObj    *pObj = NULL;
×
UNCOV
1027
  void         *pIter = NULL;
×
UNCOV
1028
  SDnodeListRsp rsp = {0};
×
UNCOV
1029
  int32_t       code = -1;
×
1030

UNCOV
1031
  rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
×
UNCOV
1032
  if (NULL == rsp.dnodeList) {
×
1033
    mError("failed to alloc epSet while process dnode list req");
×
1034
    code = terrno;
×
1035
    goto _OVER;
×
1036
  }
1037

UNCOV
1038
  while (1) {
×
UNCOV
1039
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
UNCOV
1040
    if (pIter == NULL) break;
×
1041

UNCOV
1042
    SEpSet epSet = {0};
×
UNCOV
1043
    epSet.numOfEps = 1;
×
UNCOV
1044
    tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
×
UNCOV
1045
    epSet.eps[0].port = pObj->port;
×
1046

UNCOV
1047
    if (taosArrayPush(rsp.dnodeList, &epSet) == NULL) {
×
1048
      if (terrno != 0) code = terrno;
×
1049
      sdbRelease(pSdb, pObj);
×
1050
      sdbCancelFetch(pSdb, pIter);
×
1051
      goto _OVER;
×
1052
    }
1053

UNCOV
1054
    sdbRelease(pSdb, pObj);
×
1055
  }
1056

UNCOV
1057
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
×
UNCOV
1058
  void   *pRsp = rpcMallocCont(rspLen);
×
UNCOV
1059
  if (pRsp == NULL) {
×
1060
    code = terrno;
×
1061
    goto _OVER;
×
1062
  }
1063

UNCOV
1064
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
×
1065
    code = rspLen;
×
1066
    goto _OVER;
×
1067
  }
1068

UNCOV
1069
  pReq->info.rspLen = rspLen;
×
UNCOV
1070
  pReq->info.rsp = pRsp;
×
UNCOV
1071
  code = 0;
×
1072

UNCOV
1073
_OVER:
×
1074

UNCOV
1075
  if (code != 0) {
×
1076
    mError("failed to get dnode list since %s", tstrerror(code));
×
1077
  }
1078

UNCOV
1079
  tFreeSDnodeListRsp(&rsp);
×
1080

UNCOV
1081
  TAOS_RETURN(code);
×
1082
}
1083

UNCOV
1084
void getSlowLogScopeString(int32_t scope, char *result) {
×
UNCOV
1085
  if (scope == SLOW_LOG_TYPE_NULL) {
×
1086
    (void)strncat(result, "NONE", 64);
×
1087
    return;
×
1088
  }
UNCOV
1089
  while (scope > 0) {
×
UNCOV
1090
    if (scope & SLOW_LOG_TYPE_QUERY) {
×
UNCOV
1091
      (void)strncat(result, "QUERY", 64);
×
UNCOV
1092
      scope &= ~SLOW_LOG_TYPE_QUERY;
×
UNCOV
1093
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
UNCOV
1094
      (void)strncat(result, "INSERT", 64);
×
UNCOV
1095
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
UNCOV
1096
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
UNCOV
1097
      (void)strncat(result, "OTHERS", 64);
×
UNCOV
1098
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1099
    } else {
1100
      (void)printf("invalid slow log scope:%d", scope);
×
1101
      return;
×
1102
    }
1103

UNCOV
1104
    if (scope > 0) {
×
UNCOV
1105
      (void)strncat(result, "|", 64);
×
1106
    }
1107
  }
1108
}
1109

UNCOV
1110
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
×
UNCOV
1111
  SMnode         *pMnode = pReq->info.node;
×
UNCOV
1112
  int32_t         code = -1;
×
UNCOV
1113
  SDnodeObj      *pDnode = NULL;
×
UNCOV
1114
  SCreateDnodeReq createReq = {0};
×
1115

UNCOV
1116
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
×
1117
    goto _OVER;
×
1118
  }
1119

UNCOV
1120
  TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
1121

UNCOV
1122
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
×
UNCOV
1123
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
×
1124

UNCOV
1125
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
×
1126
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1127
    goto _OVER;
×
1128
  }
1129

1130
  char ep[TSDB_EP_LEN];
UNCOV
1131
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
×
UNCOV
1132
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
UNCOV
1133
  if (pDnode != NULL) {
×
1134
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1135
    goto _OVER;
×
1136
  }
1137

UNCOV
1138
  code = mndCreateDnode(pMnode, pReq, &createReq);
×
UNCOV
1139
  if (code == 0) {
×
UNCOV
1140
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
UNCOV
1141
    tsGrantHBInterval = 5;
×
1142
  }
1143

UNCOV
1144
  char obj[200] = {0};
×
UNCOV
1145
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
×
1146

UNCOV
1147
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
×
1148

UNCOV
1149
_OVER:
×
UNCOV
1150
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1151
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1152
  }
1153

UNCOV
1154
  mndReleaseDnode(pMnode, pDnode);
×
UNCOV
1155
  tFreeSCreateDnodeReq(&createReq);
×
UNCOV
1156
  TAOS_RETURN(code);
×
1157
}
1158

1159
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1160

UNCOV
1161
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
×
1162

1163
#ifndef TD_ENTERPRISE
1164
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1165
#endif
1166

UNCOV
1167
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
×
1168
                            SSnodeObj *pSObj, int32_t numOfVnodes, bool force, bool unsafe) {
UNCOV
1169
  int32_t  code = -1;
×
UNCOV
1170
  SSdbRaw *pRaw = NULL;
×
UNCOV
1171
  STrans  *pTrans = NULL;
×
1172

UNCOV
1173
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
×
UNCOV
1174
  if (pTrans == NULL) {
×
1175
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1176
    if (terrno != 0) code = terrno;
×
1177
    goto _OVER;
×
1178
  }
UNCOV
1179
  mndTransSetSerial(pTrans);
×
UNCOV
1180
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
×
UNCOV
1181
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1182

UNCOV
1183
  pRaw = mndDnodeActionEncode(pDnode);
×
UNCOV
1184
  if (pRaw == NULL) {
×
1185
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1186
    if (terrno != 0) code = terrno;
×
1187
    goto _OVER;
×
1188
  }
UNCOV
1189
  TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRaw), NULL, _OVER);
×
UNCOV
1190
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), NULL, _OVER);
×
UNCOV
1191
  pRaw = NULL;
×
1192

UNCOV
1193
  pRaw = mndDnodeActionEncode(pDnode);
×
UNCOV
1194
  if (pRaw == NULL) {
×
1195
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1196
    if (terrno != 0) code = terrno;
×
1197
    goto _OVER;
×
1198
  }
UNCOV
1199
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
×
UNCOV
1200
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), NULL, _OVER);
×
UNCOV
1201
  pRaw = NULL;
×
1202

UNCOV
1203
  if (pMObj != NULL) {
×
UNCOV
1204
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
UNCOV
1205
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), NULL, _OVER);
×
1206
  }
1207

UNCOV
1208
  if (pQObj != NULL) {
×
UNCOV
1209
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
UNCOV
1210
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), NULL, _OVER);
×
1211
  }
1212

UNCOV
1213
  if (pSObj != NULL) {
×
UNCOV
1214
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
UNCOV
1215
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), NULL, _OVER);
×
1216
  }
1217

UNCOV
1218
  if (numOfVnodes > 0) {
×
UNCOV
1219
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
×
UNCOV
1220
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
×
1221
  }
1222

UNCOV
1223
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1224

UNCOV
1225
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP,
×
1226
                                   1);  // TODO: check the return value
UNCOV
1227
  code = 0;
×
1228

UNCOV
1229
_OVER:
×
UNCOV
1230
  mndTransDrop(pTrans);
×
UNCOV
1231
  sdbFreeRaw(pRaw);
×
UNCOV
1232
  TAOS_RETURN(code);
×
1233
}
1234

UNCOV
1235
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
UNCOV
1236
  bool       isEmpty = false;
×
UNCOV
1237
  SMnodeObj *pMObj = NULL;
×
UNCOV
1238
  SQnodeObj *pQObj = NULL;
×
UNCOV
1239
  SSnodeObj *pSObj = NULL;
×
1240

UNCOV
1241
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
UNCOV
1242
  if (pQObj) goto _OVER;
×
1243

UNCOV
1244
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
UNCOV
1245
  if (pSObj) goto _OVER;
×
1246

UNCOV
1247
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
UNCOV
1248
  if (pMObj) goto _OVER;
×
1249

UNCOV
1250
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
UNCOV
1251
  if (numOfVnodes > 0) goto _OVER;
×
1252

UNCOV
1253
  isEmpty = true;
×
UNCOV
1254
_OVER:
×
UNCOV
1255
  mndReleaseMnode(pMnode, pMObj);
×
UNCOV
1256
  mndReleaseQnode(pMnode, pQObj);
×
UNCOV
1257
  mndReleaseSnode(pMnode, pSObj);
×
UNCOV
1258
  return isEmpty;
×
1259
}
1260

UNCOV
1261
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
×
UNCOV
1262
  SMnode       *pMnode = pReq->info.node;
×
UNCOV
1263
  int32_t       code = -1;
×
UNCOV
1264
  SDnodeObj    *pDnode = NULL;
×
UNCOV
1265
  SMnodeObj    *pMObj = NULL;
×
UNCOV
1266
  SQnodeObj    *pQObj = NULL;
×
UNCOV
1267
  SSnodeObj    *pSObj = NULL;
×
UNCOV
1268
  SDropDnodeReq dropReq = {0};
×
1269

UNCOV
1270
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
1271

UNCOV
1272
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
×
1273
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
UNCOV
1274
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
×
1275

UNCOV
1276
  bool force = dropReq.force;
×
UNCOV
1277
  if (dropReq.unsafe) {
×
UNCOV
1278
    force = true;
×
1279
  }
1280

UNCOV
1281
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
×
UNCOV
1282
  if (pDnode == NULL) {
×
UNCOV
1283
    int32_t err = terrno;
×
UNCOV
1284
    char    ep[TSDB_EP_LEN + 1] = {0};
×
UNCOV
1285
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
UNCOV
1286
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
UNCOV
1287
    if (pDnode == NULL) {
×
UNCOV
1288
      code = err;
×
UNCOV
1289
      goto _OVER;
×
1290
    }
1291
  }
1292

UNCOV
1293
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
×
UNCOV
1294
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
×
UNCOV
1295
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
×
UNCOV
1296
  if (pMObj != NULL) {
×
UNCOV
1297
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
×
UNCOV
1298
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
×
UNCOV
1299
      goto _OVER;
×
1300
    }
UNCOV
1301
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
×
UNCOV
1302
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
×
UNCOV
1303
      goto _OVER;
×
1304
    }
1305
  }
1306

UNCOV
1307
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
UNCOV
1308
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
1309

UNCOV
1310
  if (isonline && force) {
×
1311
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1312
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1313
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1314
    goto _OVER;
×
1315
  }
1316

UNCOV
1317
  bool isEmpty = mndIsEmptyDnode(pMnode, pDnode->id);
×
UNCOV
1318
  if (!isonline && !force && !isEmpty) {
×
UNCOV
1319
    code = TSDB_CODE_DNODE_OFFLINE;
×
UNCOV
1320
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1321
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
UNCOV
1322
    goto _OVER;
×
1323
  }
1324

UNCOV
1325
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
×
UNCOV
1326
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1327

UNCOV
1328
  char obj1[30] = {0};
×
UNCOV
1329
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
×
1330

UNCOV
1331
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
×
1332

UNCOV
1333
_OVER:
×
UNCOV
1334
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1335
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1336
  }
1337

UNCOV
1338
  mndReleaseDnode(pMnode, pDnode);
×
UNCOV
1339
  mndReleaseMnode(pMnode, pMObj);
×
UNCOV
1340
  mndReleaseQnode(pMnode, pQObj);
×
UNCOV
1341
  mndReleaseSnode(pMnode, pSObj);
×
UNCOV
1342
  tFreeSDropDnodeReq(&dropReq);
×
UNCOV
1343
  TAOS_RETURN(code);
×
1344
}
1345

1346
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
×
1347
  int32_t code = 0;
×
1348
  SMnode *pMnode = pReq->info.node;
×
1349
  SSdb   *pSdb = pMnode->pSdb;
×
1350
  void   *pIter = NULL;
×
1351
  int8_t  encrypting = 0;
×
1352

1353
  const STraceId *trace = &pReq->info.traceId;
×
1354

1355
  int32_t klen = strlen(pDcfgReq->value);
×
1356
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
1357
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1358
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1359
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1360
    goto _exit;
×
1361
  }
1362

1363
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
×
1364
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1365
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1366
    goto _exit;
×
1367
  }
1368

1369
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
×
1370
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1371
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1372
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1373
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1374
    goto _exit;
×
1375
  }
1376

1377
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
×
1378
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
×
1379
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
×
1380

1381
  while (1) {
×
1382
    SDnodeObj *pDnode = NULL;
×
1383
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
1384
    if (pIter == NULL) break;
×
1385
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
1386
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1387
             offlineReason[pDnode->offlineReason]);
1388
      sdbRelease(pSdb, pDnode);
×
1389
      continue;
×
1390
    }
1391

1392
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
×
1393
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
1394
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
×
1395
      void   *pBuf = rpcMallocCont(bufLen);
×
1396

1397
      if (pBuf != NULL) {
×
1398
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
×
1399
          code = bufLen;
×
1400
          sdbRelease(pSdb, pDnode);
×
1401
          goto _exit;
×
1402
        }
1403
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
1404
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
×
1405
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
×
1406
        }
1407
      }
1408
    }
1409

1410
    sdbRelease(pSdb, pDnode);
×
1411
  }
1412

1413
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
×
1414
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1415
  }
1416

1417
_exit:
×
1418
  if (code != 0) {
×
1419
    if (terrno == 0) terrno = code;
×
1420
  }
1421
  return code;
×
1422
}
1423

1424
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
×
1425
  int32_t code = 0;
×
1426

1427
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1428
  SMnode       *pMnode = pReq->info.node;
×
1429
  SMCfgDnodeReq cfgReq = {0};
×
1430
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
×
1431

1432
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
×
1433
    tFreeSMCfgDnodeReq(&cfgReq);
×
1434
    TAOS_RETURN(code);
×
1435
  }
1436
  const STraceId *trace = &pReq->info.traceId;
×
1437
  SDCfgDnodeReq   dcfgReq = {0};
×
1438
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
×
1439
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
×
1440
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
×
1441
    tFreeSMCfgDnodeReq(&cfgReq);
×
1442
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
×
1443
  } else {
1444
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1445
    tFreeSMCfgDnodeReq(&cfgReq);
×
1446
    TAOS_RETURN(code);
×
1447
  }
1448

1449
#else
1450
  TAOS_RETURN(code);
1451
#endif
1452
}
1453

1454
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
×
1455
  SMnode *pMnode = pRsp->info.node;
×
1456
  int16_t nSuccess = 0;
×
1457
  int16_t nFailed = 0;
×
1458

1459
  if (0 == pRsp->code) {
×
1460
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
×
1461
  } else {
1462
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1463
  }
1464

1465
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
×
1466
  bool    finished = nSuccess + nFailed >= nReq;
×
1467

1468
  if (finished) {
×
1469
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1470
  }
1471

1472
  const STraceId *trace = &pRsp->info.traceId;
×
1473
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
×
1474
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1475

1476
  return 0;
×
1477
}
1478

UNCOV
1479
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
1480
  SMnode *pMnode = pReq->info.node;
×
UNCOV
1481
  int32_t totalRows = 0;
×
UNCOV
1482
  int32_t numOfRows = 0;
×
UNCOV
1483
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
×
UNCOV
1484
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
×
UNCOV
1485
  char   *pWrite = NULL;
×
UNCOV
1486
  int32_t cols = 0;
×
UNCOV
1487
  int32_t code = 0;
×
UNCOV
1488
  int32_t lino = 0;
×
1489

UNCOV
1490
  cfgOpts[totalRows] = "statusInterval";
×
UNCOV
1491
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
×
UNCOV
1492
  totalRows++;
×
1493

UNCOV
1494
  cfgOpts[totalRows] = "timezone";
×
UNCOV
1495
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
×
UNCOV
1496
  totalRows++;
×
1497

UNCOV
1498
  cfgOpts[totalRows] = "locale";
×
UNCOV
1499
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
×
UNCOV
1500
  totalRows++;
×
1501

UNCOV
1502
  cfgOpts[totalRows] = "charset";
×
UNCOV
1503
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
×
UNCOV
1504
  totalRows++;
×
1505

UNCOV
1506
  cfgOpts[totalRows] = "monitor";
×
UNCOV
1507
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
×
UNCOV
1508
  totalRows++;
×
1509

UNCOV
1510
  cfgOpts[totalRows] = "monitorInterval";
×
UNCOV
1511
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
×
UNCOV
1512
  totalRows++;
×
1513

UNCOV
1514
  cfgOpts[totalRows] = "slowLogThreshold";
×
UNCOV
1515
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
×
UNCOV
1516
  totalRows++;
×
1517

UNCOV
1518
  cfgOpts[totalRows] = "slowLogMaxLen";
×
UNCOV
1519
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
×
UNCOV
1520
  totalRows++;
×
1521

UNCOV
1522
  char scopeStr[64] = {0};
×
UNCOV
1523
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
×
UNCOV
1524
  cfgOpts[totalRows] = "slowLogScope";
×
UNCOV
1525
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
×
UNCOV
1526
  totalRows++;
×
1527

UNCOV
1528
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1529
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
×
1530

UNCOV
1531
  for (int32_t i = 0; i < totalRows; i++) {
×
UNCOV
1532
    cols = 0;
×
1533

UNCOV
1534
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
×
UNCOV
1535
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1536
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
×
1537

UNCOV
1538
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
×
UNCOV
1539
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1540
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
×
1541

UNCOV
1542
    numOfRows++;
×
1543
  }
1544

UNCOV
1545
_OVER:
×
UNCOV
1546
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
×
UNCOV
1547
  pShow->numOfRows += numOfRows;
×
UNCOV
1548
  return numOfRows;
×
1549
}
1550

1551
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1552

UNCOV
1553
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
1554
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
1555
  SSdb      *pSdb = pMnode->pSdb;
×
UNCOV
1556
  int32_t    numOfRows = 0;
×
UNCOV
1557
  int32_t    cols = 0;
×
UNCOV
1558
  ESdbStatus objStatus = 0;
×
UNCOV
1559
  SDnodeObj *pDnode = NULL;
×
UNCOV
1560
  int64_t    curMs = taosGetTimestampMs();
×
1561
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
UNCOV
1562
  int32_t    code = 0;
×
UNCOV
1563
  int32_t    lino = 0;
×
1564

UNCOV
1565
  while (numOfRows < rows) {
×
UNCOV
1566
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
×
UNCOV
1567
    if (pShow->pIter == NULL) break;
×
UNCOV
1568
    bool online = mndIsDnodeOnline(pDnode, curMs);
×
1569

UNCOV
1570
    cols = 0;
×
1571

UNCOV
1572
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1573
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
×
1574

UNCOV
1575
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
×
1576

UNCOV
1577
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1578
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1579

UNCOV
1580
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1581
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
×
UNCOV
1582
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
×
1583

UNCOV
1584
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1585
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
×
1586
                        &lino, _OVER);
1587

UNCOV
1588
    const char *status = "ready";
×
UNCOV
1589
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
×
UNCOV
1590
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
×
UNCOV
1591
    if (!online) {
×
UNCOV
1592
      if (objStatus == SDB_STATUS_CREATING)
×
1593
        status = "creating*";
×
UNCOV
1594
      else if (objStatus == SDB_STATUS_DROPPING)
×
1595
        status = "dropping*";
×
1596
      else
UNCOV
1597
        status = "offline";
×
1598
    }
1599

UNCOV
1600
    STR_TO_VARSTR(buf, status);
×
UNCOV
1601
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1602
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1603

UNCOV
1604
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1605
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
×
1606
                        _OVER);
1607

UNCOV
1608
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1609
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
×
1610
                        _OVER);
1611

UNCOV
1612
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
×
UNCOV
1613
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
×
1614

UNCOV
1615
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1616
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
×
UNCOV
1617
    taosMemoryFreeClear(b);
×
1618

1619
#ifdef TD_ENTERPRISE
UNCOV
1620
    STR_TO_VARSTR(buf, pDnode->machineId);
×
UNCOV
1621
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1622
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1623
#endif
1624

UNCOV
1625
    numOfRows++;
×
UNCOV
1626
    sdbRelease(pSdb, pDnode);
×
1627
  }
1628

UNCOV
1629
_OVER:
×
UNCOV
1630
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
×
1631

UNCOV
1632
  pShow->numOfRows += numOfRows;
×
UNCOV
1633
  return numOfRows;
×
1634
}
1635

1636
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1637
  SSdb *pSdb = pMnode->pSdb;
×
1638
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1639
}
×
1640

1641
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
8✔
1642
  SDnodeObj *pObj = NULL;
8✔
1643
  void      *pIter = NULL;
8✔
1644
  SSdb      *pSdb = pMnode->pSdb;
8✔
1645
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
8✔
UNCOV
1646
  while (1) {
×
1647
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
8✔
1648
    if (pIter == NULL) break;
8!
1649

UNCOV
1650
    char *fqdn = taosStrdup(pObj->fqdn);
×
UNCOV
1651
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1652
      mError("failed to fqdn into array, but continue at this time");
×
1653
    }
UNCOV
1654
    sdbRelease(pSdb, pObj);
×
1655
  }
1656
  return fqdns;
8✔
1657
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc