• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include <stdio.h>
18
#include "audit.h"
19
#include "mndCluster.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSnode.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "taos_monitor.h"
31
#include "tconfig.h"
32
#include "tjson.h"
33
#include "tmisce.h"
34
#include "tunit.h"
35

36
#define TSDB_DNODE_VER_NUMBER   2
37
#define TSDB_DNODE_RESERVE_SIZE 40
38

39
static const char *offlineReason[] = {
40
    "",
41
    "status msg timeout",
42
    "status not received",
43
    "version not match",
44
    "dnodeId not match",
45
    "clusterId not match",
46
    "statusInterval not match",
47
    "timezone not match",
48
    "locale not match",
49
    "charset not match",
50
    "ttlChangeOnWrite not match",
51
    "enableWhiteList not match",
52
    "encryptionKey not match",
53
    "monitor not match",
54
    "monitor switch not match",
55
    "monitor interval not match",
56
    "monitor slow log threshold not match",
57
    "monitor slow log sql max len not match",
58
    "monitor slow log scope not match",
59
    "unknown",
60
};
61

62
enum {
63
  DND_ACTIVE_CODE,
64
  DND_CONN_ACTIVE_CODE,
65
};
66

67
enum {
68
  DND_CREATE,
69
  DND_ADD,
70
  DND_DROP,
71
};
72

73
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
74
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
75
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
76
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
77
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
78
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
79
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
80

81
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
82
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
83
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
84
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
85
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
86
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
87
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
88
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
89
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
90
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
91

92
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
93
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
94
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
95
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
96

97
#ifdef _GRANT
98
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
99
#else
100
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
101
#endif
102

UNCOV
103
int32_t mndInitDnode(SMnode *pMnode) {
×
UNCOV
104
  SSdbTable table = {
×
105
      .sdbType = SDB_DNODE,
106
      .keyType = SDB_KEY_INT32,
107
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
108
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
109
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
110
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
111
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
112
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
113
  };
114

UNCOV
115
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
×
UNCOV
116
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
×
UNCOV
117
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
×
UNCOV
118
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
×
UNCOV
119
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
×
UNCOV
120
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
×
UNCOV
121
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
×
UNCOV
122
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
×
UNCOV
123
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
×
UNCOV
124
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
×
UNCOV
125
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
×
126

UNCOV
127
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
×
UNCOV
128
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
×
UNCOV
129
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
×
UNCOV
130
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
×
131

UNCOV
132
  return sdbSetTable(pMnode->pSdb, table);
×
133
}
134

135
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
136
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
137
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
UNCOV
138
void          mndCleanupDnode(SMnode *pMnode) {}
×
139

UNCOV
140
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
×
UNCOV
141
  int32_t  code = -1;
×
UNCOV
142
  SSdbRaw *pRaw = NULL;
×
UNCOV
143
  STrans  *pTrans = NULL;
×
144

UNCOV
145
  SDnodeObj dnodeObj = {0};
×
UNCOV
146
  dnodeObj.id = 1;
×
UNCOV
147
  dnodeObj.createdTime = taosGetTimestampMs();
×
UNCOV
148
  dnodeObj.updateTime = dnodeObj.createdTime;
×
UNCOV
149
  dnodeObj.port = tsServerPort;
×
UNCOV
150
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
×
UNCOV
151
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
×
UNCOV
152
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
×
UNCOV
153
  char *machineId = NULL;
×
UNCOV
154
  code = tGetMachineId(&machineId);
×
UNCOV
155
  if (machineId) {
×
UNCOV
156
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
×
UNCOV
157
    taosMemoryFreeClear(machineId);
×
158
  } else {
159
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
160
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
161
    goto _OVER;
×
162
#endif
163
  }
164

UNCOV
165
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
×
UNCOV
166
  if (pTrans == NULL) {
×
167
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
168
    if (terrno != 0) code = terrno;
×
169
    goto _OVER;
×
170
  }
UNCOV
171
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
×
172

UNCOV
173
  pRaw = mndDnodeActionEncode(&dnodeObj);
×
UNCOV
174
  if (pRaw == NULL) {
×
175
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    goto _OVER;
×
178
  }
UNCOV
179
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
×
UNCOV
180
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
×
UNCOV
181
  pRaw = NULL;
×
182

UNCOV
183
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
UNCOV
184
  code = 0;
×
UNCOV
185
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
×
186
                                   1);  // TODO: check the return value
187

UNCOV
188
_OVER:
×
UNCOV
189
  mndTransDrop(pTrans);
×
UNCOV
190
  sdbFreeRaw(pRaw);
×
UNCOV
191
  return code;
×
192
}
193

UNCOV
194
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
×
UNCOV
195
  int32_t code = 0;
×
UNCOV
196
  int32_t lino = 0;
×
UNCOV
197
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
198

UNCOV
199
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
×
UNCOV
200
  if (pRaw == NULL) goto _OVER;
×
201

UNCOV
202
  int32_t dataPos = 0;
×
UNCOV
203
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
×
UNCOV
204
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
×
UNCOV
205
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
×
UNCOV
206
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
×
UNCOV
207
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
×
UNCOV
208
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
×
UNCOV
209
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
×
UNCOV
210
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
×
UNCOV
211
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
×
UNCOV
212
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
×
213

UNCOV
214
  terrno = 0;
×
215

UNCOV
216
_OVER:
×
UNCOV
217
  if (terrno != 0) {
×
218
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
219
    sdbFreeRaw(pRaw);
×
220
    return NULL;
×
221
  }
222

UNCOV
223
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
×
UNCOV
224
  return pRaw;
×
225
}
226

UNCOV
227
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
×
UNCOV
228
  int32_t code = 0;
×
UNCOV
229
  int32_t lino = 0;
×
UNCOV
230
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
231
  SSdbRow   *pRow = NULL;
×
UNCOV
232
  SDnodeObj *pDnode = NULL;
×
233

UNCOV
234
  int8_t sver = 0;
×
UNCOV
235
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
UNCOV
236
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
×
237
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
238
    goto _OVER;
×
239
  }
240

UNCOV
241
  pRow = sdbAllocRow(sizeof(SDnodeObj));
×
UNCOV
242
  if (pRow == NULL) goto _OVER;
×
243

UNCOV
244
  pDnode = sdbGetRowObj(pRow);
×
UNCOV
245
  if (pDnode == NULL) goto _OVER;
×
246

UNCOV
247
  int32_t dataPos = 0;
×
UNCOV
248
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
×
UNCOV
249
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
×
UNCOV
250
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
×
UNCOV
251
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
×
UNCOV
252
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
×
UNCOV
253
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
×
UNCOV
254
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
×
UNCOV
255
  if (sver > 1) {
×
UNCOV
256
    int16_t keyLen = 0;
×
UNCOV
257
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
×
UNCOV
258
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
×
UNCOV
259
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
×
UNCOV
260
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
×
261
  }
262

UNCOV
263
  terrno = 0;
×
UNCOV
264
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
×
265
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
266
  }
267

UNCOV
268
_OVER:
×
UNCOV
269
  if (terrno != 0) {
×
270
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
271
    taosMemoryFreeClear(pRow);
×
272
    return NULL;
×
273
  }
274

UNCOV
275
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
×
UNCOV
276
  return pRow;
×
277
}
278

UNCOV
279
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
×
UNCOV
280
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
×
UNCOV
281
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
×
282

UNCOV
283
  char ep[TSDB_EP_LEN] = {0};
×
UNCOV
284
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
×
UNCOV
285
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
×
UNCOV
286
  return 0;
×
287
}
288

UNCOV
289
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
×
UNCOV
290
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
×
UNCOV
291
  return 0;
×
292
}
293

UNCOV
294
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
×
UNCOV
295
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
×
UNCOV
296
  pOld->updateTime = pNew->updateTime;
×
297
#ifdef TD_ENTERPRISE
UNCOV
298
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
×
299
#endif
UNCOV
300
  return 0;
×
301
}
302

UNCOV
303
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
×
UNCOV
304
  SSdb      *pSdb = pMnode->pSdb;
×
UNCOV
305
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
×
UNCOV
306
  if (pDnode == NULL) {
×
UNCOV
307
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
UNCOV
308
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
UNCOV
309
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
310
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
UNCOV
311
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
UNCOV
312
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
×
313
    } else {
314
      terrno = TSDB_CODE_APP_ERROR;
×
315
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
316
    }
317
  }
318

UNCOV
319
  return pDnode;
×
320
}
321

UNCOV
322
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
×
UNCOV
323
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
324
  sdbRelease(pSdb, pDnode);
×
UNCOV
325
}
×
326

UNCOV
327
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
×
UNCOV
328
  SEpSet epSet = {0};
×
UNCOV
329
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
×
UNCOV
330
  return epSet;
×
331
}
332

333
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
×
334
  SEpSet     epSet = {0};
×
335
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
336
  if (!pDnode) return epSet;
×
337

338
  epSet = mndGetDnodeEpset(pDnode);
×
339

340
  mndReleaseDnode(pMnode, pDnode);
×
341
  return epSet;
×
342
}
343

UNCOV
344
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
×
UNCOV
345
  SSdb *pSdb = pMnode->pSdb;
×
346

UNCOV
347
  void *pIter = NULL;
×
UNCOV
348
  while (1) {
×
UNCOV
349
    SDnodeObj *pDnode = NULL;
×
UNCOV
350
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
UNCOV
351
    if (pIter == NULL) break;
×
352

UNCOV
353
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
×
UNCOV
354
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
355
      return pDnode;
×
356
    }
357

UNCOV
358
    sdbRelease(pSdb, pDnode);
×
359
  }
360

UNCOV
361
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
UNCOV
362
  return NULL;
×
363
}
364

UNCOV
365
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
×
UNCOV
366
  SSdb *pSdb = pMnode->pSdb;
×
367

UNCOV
368
  void *pIter = NULL;
×
UNCOV
369
  while (1) {
×
UNCOV
370
    SDnodeObj *pDnode = NULL;
×
UNCOV
371
    ESdbStatus objStatus = 0;
×
UNCOV
372
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
×
UNCOV
373
    if (pIter == NULL) break;
×
374

UNCOV
375
    if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
×
UNCOV
376
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
377
      return pDnode;
×
378
    }
379

UNCOV
380
    sdbRelease(pSdb, pDnode);
×
381
  }
382

383
  return NULL;
×
384
}
385

UNCOV
386
int32_t mndGetDnodeSize(SMnode *pMnode) {
×
UNCOV
387
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
388
  return sdbGetSize(pSdb, SDB_DNODE);
×
389
}
390

391
int32_t mndGetDbSize(SMnode *pMnode) {
×
392
  SSdb *pSdb = pMnode->pSdb;
×
393
  return sdbGetSize(pSdb, SDB_DB);
×
394
}
395

UNCOV
396
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
×
UNCOV
397
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
×
UNCOV
398
  if (interval > 5000 * (int64_t)tsStatusInterval) {
×
UNCOV
399
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
×
UNCOV
400
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
×
401
    }
UNCOV
402
    return false;
×
403
  }
UNCOV
404
  return true;
×
405
}
406

UNCOV
407
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
×
UNCOV
408
  SSdb *pSdb = pMnode->pSdb;
×
409

UNCOV
410
  int32_t numOfEps = 0;
×
UNCOV
411
  void   *pIter = NULL;
×
UNCOV
412
  while (1) {
×
UNCOV
413
    SDnodeObj *pDnode = NULL;
×
UNCOV
414
    ESdbStatus objStatus = 0;
×
UNCOV
415
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
×
UNCOV
416
    if (pIter == NULL) break;
×
417

UNCOV
418
    SDnodeEp dnodeEp = {0};
×
UNCOV
419
    dnodeEp.id = pDnode->id;
×
UNCOV
420
    dnodeEp.ep.port = pDnode->port;
×
UNCOV
421
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
422
    sdbRelease(pSdb, pDnode);
×
423

UNCOV
424
    dnodeEp.isMnode = 0;
×
UNCOV
425
    if (mndIsMnode(pMnode, pDnode->id)) {
×
UNCOV
426
      dnodeEp.isMnode = 1;
×
427
    }
UNCOV
428
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
×
429
      mError("failed to put ep into array, but continue at this call");
×
430
    }
431
  }
UNCOV
432
}
×
433

UNCOV
434
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
×
UNCOV
435
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
436
  int32_t code = 0;
×
437

UNCOV
438
  int32_t numOfEps = 0;
×
UNCOV
439
  void   *pIter = NULL;
×
UNCOV
440
  while (1) {
×
UNCOV
441
    SDnodeObj *pDnode = NULL;
×
UNCOV
442
    ESdbStatus objStatus = 0;
×
UNCOV
443
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
×
UNCOV
444
    if (pIter == NULL) break;
×
445

446
    SDnodeInfo dInfo;
UNCOV
447
    dInfo.id = pDnode->id;
×
UNCOV
448
    dInfo.ep.port = pDnode->port;
×
UNCOV
449
    dInfo.offlineReason = pDnode->offlineReason;
×
UNCOV
450
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
451
    sdbRelease(pSdb, pDnode);
×
UNCOV
452
    if (mndIsMnode(pMnode, pDnode->id)) {
×
UNCOV
453
      dInfo.isMnode = 1;
×
454
    } else {
UNCOV
455
      dInfo.isMnode = 0;
×
456
    }
457

UNCOV
458
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
×
459
      code = terrno;
×
460
      sdbCancelFetch(pSdb, pIter);
×
461
      break;
×
462
    }
463
  }
UNCOV
464
  TAOS_RETURN(code);
×
465
}
466

467
#define CHECK_MONITOR_PARA(para, err)                                                                    \
468
  if (pCfg->monitorParas.para != para) {                                                                 \
469
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
470
    terrno = err;                                                                                        \
471
    return err;                                                                                          \
472
  }
473

UNCOV
474
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
×
UNCOV
475
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
×
UNCOV
476
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
×
UNCOV
477
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
×
UNCOV
478
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
×
UNCOV
479
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
×
480

UNCOV
481
  if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
×
482
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
483
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
484
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
485
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
486
  }
487

UNCOV
488
  if (pCfg->statusInterval != tsStatusInterval) {
×
489
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
×
490
           tsStatusInterval);
491
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
×
492
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
×
493
  }
494

UNCOV
495
  if ((0 != strcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
×
496
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
497
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
498
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
499
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
500
  }
501

UNCOV
502
  if (0 != strcasecmp(pCfg->locale, tsLocale)) {
×
503
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
504
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
505
    return DND_REASON_LOCALE_NOT_MATCH;
×
506
  }
507

UNCOV
508
  if (0 != strcasecmp(pCfg->charset, tsCharset)) {
×
509
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
510
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
511
    return DND_REASON_CHARSET_NOT_MATCH;
×
512
  }
513

UNCOV
514
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
×
515
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
516
           tsTtlChangeOnWrite);
517
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
518
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
519
  }
UNCOV
520
  int8_t enable = tsEnableWhiteList ? 1 : 0;
×
UNCOV
521
  if (pCfg->enableWhiteList != enable) {
×
522
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
523
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
524
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
525
  }
526

UNCOV
527
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
×
UNCOV
528
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
×
529
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
530
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
531
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
532
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
533
  }
534

UNCOV
535
  return DND_REASON_ONLINE;
×
536
}
537

UNCOV
538
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
×
UNCOV
539
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
×
540
    return 0.0;
×
541
  }
542

UNCOV
543
  int64_t deltaCount = currentCount - lastCount;
×
UNCOV
544
  int64_t deltaMs = currentTimeMs - lastTimeMs;
×
UNCOV
545
  double  rate = (double)deltaCount / (double)deltaMs;
×
UNCOV
546
  return rate;
×
547
}
548

UNCOV
549
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
×
UNCOV
550
  bool stateChanged = false;
×
UNCOV
551
  bool roleChanged = pGid->syncState != pVload->syncState ||
×
UNCOV
552
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
×
UNCOV
553
                     pGid->roleTimeMs != pVload->roleTimeMs;
×
554

UNCOV
555
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
×
UNCOV
556
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
×
UNCOV
557
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
×
UNCOV
558
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
×
UNCOV
559
      int64_t currentTimeMs = taosGetTimestampMs();
×
UNCOV
560
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
×
561
                                          pGid->lastSyncAppliedIndexUpdateTime);
562

UNCOV
563
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
×
564
    }
565
  }
566

UNCOV
567
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
×
UNCOV
568
  pGid->syncCommitIndex = pVload->syncCommitIndex;
×
UNCOV
569
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
×
UNCOV
570
      pGid->startTimeMs != pVload->startTimeMs) {
×
UNCOV
571
    mInfo(
×
572
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
573
        "canRead:%d, dnode:%d",
574
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
575
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
UNCOV
576
    pGid->syncState = pVload->syncState;
×
UNCOV
577
    pGid->syncTerm = pVload->syncTerm;
×
UNCOV
578
    pGid->syncRestore = pVload->syncRestore;
×
UNCOV
579
    pGid->syncCanRead = pVload->syncCanRead;
×
UNCOV
580
    pGid->startTimeMs = pVload->startTimeMs;
×
UNCOV
581
    pGid->roleTimeMs = pVload->roleTimeMs;
×
UNCOV
582
    stateChanged = true;
×
583
  }
UNCOV
584
  return stateChanged;
×
585
}
586

UNCOV
587
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
×
UNCOV
588
  bool stateChanged = false;
×
UNCOV
589
  bool roleChanged = pObj->syncState != pMload->syncState ||
×
UNCOV
590
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
×
UNCOV
591
                     pObj->roleTimeMs != pMload->roleTimeMs;
×
UNCOV
592
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
×
UNCOV
593
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
×
594
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
595
          pObj->syncTerm, pMload->syncTerm);
UNCOV
596
    pObj->syncState = pMload->syncState;
×
UNCOV
597
    pObj->syncTerm = pMload->syncTerm;
×
UNCOV
598
    pObj->syncRestore = pMload->syncRestore;
×
UNCOV
599
    pObj->roleTimeMs = pMload->roleTimeMs;
×
UNCOV
600
    stateChanged = true;
×
601
  }
UNCOV
602
  return stateChanged;
×
603
}
604

605
extern char   *tsMonFwUri;
606
extern char   *tsMonSlowLogUri;
UNCOV
607
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
×
UNCOV
608
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
609
  SStatisReq statisReq = {0};
×
UNCOV
610
  int32_t    code = -1;
×
611

UNCOV
612
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
×
613

UNCOV
614
  if (tsMonitorLogProtocol) {
×
615
    mInfo("process statis req,\n %s", statisReq.pCont);
×
616
  }
617

UNCOV
618
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
×
UNCOV
619
    monSendContent(statisReq.pCont, tsMonFwUri);
×
UNCOV
620
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
UNCOV
621
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
622
  }
623

UNCOV
624
  tFreeSStatisReq(&statisReq);
×
UNCOV
625
  return 0;
×
626
}
627

UNCOV
628
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
×
UNCOV
629
  mTrace("process audit req:%p", pReq);
×
UNCOV
630
  if (tsEnableAudit && tsEnableAuditDelete) {
×
UNCOV
631
    SMnode   *pMnode = pReq->info.node;
×
UNCOV
632
    SAuditReq auditReq = {0};
×
633

UNCOV
634
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
635

UNCOV
636
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
×
637

UNCOV
638
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
×
639
                   auditReq.sqlLen);
640

UNCOV
641
    tFreeSAuditReq(&auditReq);
×
642
  }
UNCOV
643
  return 0;
×
644
}
645

UNCOV
646
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
×
UNCOV
647
  int32_t       code = 0, lino = 0;
×
UNCOV
648
  SDnodeInfoReq infoReq = {0};
×
UNCOV
649
  int32_t       contLen = 0;
×
UNCOV
650
  void         *pReq = NULL;
×
651

UNCOV
652
  infoReq.dnodeId = pDnode->id;
×
UNCOV
653
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
×
654

UNCOV
655
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
×
656
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
657
  }
UNCOV
658
  pReq = rpcMallocCont(contLen);
×
UNCOV
659
  if (pReq == NULL) {
×
660
    TAOS_RETURN(terrno);
×
661
  }
662

UNCOV
663
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
×
664
    code = contLen;
×
665
    goto _exit;
×
666
  }
667

UNCOV
668
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
×
UNCOV
669
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
×
UNCOV
670
_exit:
×
UNCOV
671
  if (code < 0) {
×
672
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
673
  }
UNCOV
674
  TAOS_RETURN(code);
×
675
}
676

UNCOV
677
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
×
UNCOV
678
  int32_t       code = 0, lino = 0;
×
UNCOV
679
  SMnode       *pMnode = pReq->info.node;
×
UNCOV
680
  SDnodeInfoReq infoReq = {0};
×
UNCOV
681
  SDnodeObj    *pDnode = NULL;
×
UNCOV
682
  STrans       *pTrans = NULL;
×
UNCOV
683
  SSdbRaw      *pCommitRaw = NULL;
×
684

UNCOV
685
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
×
686

UNCOV
687
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
×
UNCOV
688
  if (pDnode == NULL) {
×
689
    TAOS_CHECK_EXIT(terrno);
×
690
  }
691

UNCOV
692
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
×
UNCOV
693
  if (pTrans == NULL) {
×
694
    TAOS_CHECK_EXIT(terrno);
×
695
  }
696

UNCOV
697
  pDnode->updateTime = taosGetTimestampMs();
×
698

UNCOV
699
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
×
700
    TAOS_CHECK_EXIT(terrno);
×
701
  }
UNCOV
702
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
703
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
704
    TAOS_CHECK_EXIT(code);
×
705
  }
UNCOV
706
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
UNCOV
707
  pCommitRaw = NULL;
×
708

UNCOV
709
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
710
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
711
    TAOS_CHECK_EXIT(code);
×
712
  }
713

UNCOV
714
_exit:
×
UNCOV
715
  mndReleaseDnode(pMnode, pDnode);
×
UNCOV
716
  if (code != 0) {
×
717
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
718
  }
UNCOV
719
  mndTransDrop(pTrans);
×
UNCOV
720
  sdbFreeRaw(pCommitRaw);
×
UNCOV
721
  TAOS_RETURN(code);
×
722
}
723

UNCOV
724
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
×
UNCOV
725
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
726
  SStatusReq statusReq = {0};
×
UNCOV
727
  SDnodeObj *pDnode = NULL;
×
UNCOV
728
  int32_t    code = -1;
×
729

UNCOV
730
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
×
731

UNCOV
732
  int64_t clusterid = mndGetClusterId(pMnode);
×
UNCOV
733
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
×
UNCOV
734
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
UNCOV
735
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
736
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
UNCOV
737
    goto _OVER;
×
738
  }
739

UNCOV
740
  if (statusReq.dnodeId == 0) {
×
UNCOV
741
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
×
UNCOV
742
    if (pDnode == NULL) {
×
UNCOV
743
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
×
UNCOV
744
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
745
      if (terrno != 0) code = terrno;
×
UNCOV
746
      goto _OVER;
×
747
    }
748
  } else {
UNCOV
749
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
×
UNCOV
750
    if (pDnode == NULL) {
×
UNCOV
751
      int32_t err = terrno;
×
UNCOV
752
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
×
UNCOV
753
      if (pDnode != NULL) {
×
UNCOV
754
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
×
UNCOV
755
        terrno = err;
×
UNCOV
756
        goto _OVER;
×
757
      }
758

UNCOV
759
      mError("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
×
UNCOV
760
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
×
UNCOV
761
        terrno = err;
×
UNCOV
762
        goto _OVER;
×
763
      } else {
UNCOV
764
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
×
UNCOV
765
        if (pDnode == NULL) goto _OVER;
×
766
      }
767
    }
768
  }
769

UNCOV
770
  pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
×
771

UNCOV
772
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
×
UNCOV
773
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
×
UNCOV
774
  int64_t curMs = taosGetTimestampMs();
×
UNCOV
775
  bool    online = mndIsDnodeOnline(pDnode, curMs);
×
UNCOV
776
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
×
UNCOV
777
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
×
UNCOV
778
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
×
UNCOV
779
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
×
UNCOV
780
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
×
UNCOV
781
  bool    analVerChanged = (analVer != statusReq.analVer);
×
UNCOV
782
  bool    needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
×
UNCOV
783
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
×
UNCOV
784
  const STraceId *trace = &pReq->info.traceId;
×
UNCOV
785
  mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
×
786
          pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
787

UNCOV
788
  if (reboot) {
×
UNCOV
789
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
×
790
  }
791

UNCOV
792
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
×
UNCOV
793
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
×
794

UNCOV
795
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
UNCOV
796
    if (pVgroup != NULL) {
×
UNCOV
797
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
UNCOV
798
        pVgroup->cacheUsage = pVload->cacheUsage;
×
UNCOV
799
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
×
UNCOV
800
        pVgroup->numOfTables = pVload->numOfTables;
×
UNCOV
801
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
×
UNCOV
802
        pVgroup->totalStorage = pVload->totalStorage;
×
UNCOV
803
        pVgroup->compStorage = pVload->compStorage;
×
UNCOV
804
        pVgroup->pointsWritten = pVload->pointsWritten;
×
805
      }
UNCOV
806
      bool stateChanged = false;
×
UNCOV
807
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
×
UNCOV
808
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
×
UNCOV
809
        if (pGid->dnodeId == statusReq.dnodeId) {
×
UNCOV
810
          if (pVload->startTimeMs == 0) {
×
811
            pVload->startTimeMs = statusReq.rebootTime;
×
812
          }
UNCOV
813
          if (pVload->roleTimeMs == 0) {
×
814
            pVload->roleTimeMs = statusReq.rebootTime;
×
815
          }
UNCOV
816
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
×
UNCOV
817
          break;
×
818
        }
819
      }
UNCOV
820
      if (stateChanged) {
×
UNCOV
821
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
UNCOV
822
        if (pDb != NULL && pDb->stateTs != curMs) {
×
UNCOV
823
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
×
824
                pDb->stateTs, curMs);
UNCOV
825
          pDb->stateTs = curMs;
×
826
        }
UNCOV
827
        mndReleaseDb(pMnode, pDb);
×
828
      }
829
    }
830

UNCOV
831
    mndReleaseVgroup(pMnode, pVgroup);
×
832
  }
833

UNCOV
834
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
×
UNCOV
835
  if (pObj != NULL) {
×
UNCOV
836
    if (statusReq.mload.roleTimeMs == 0) {
×
UNCOV
837
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
×
838
    }
UNCOV
839
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
×
UNCOV
840
    mndReleaseMnode(pMnode, pObj);
×
841
  }
842

UNCOV
843
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
×
UNCOV
844
  if (pQnode != NULL) {
×
UNCOV
845
    pQnode->load = statusReq.qload;
×
UNCOV
846
    mndReleaseQnode(pMnode, pQnode);
×
847
  }
848

UNCOV
849
  if (needCheck) {
×
UNCOV
850
    if (statusReq.sver != tsVersion) {
×
851
      if (pDnode != NULL) {
×
852
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
853
      }
854
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
855
      terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
856
      goto _OVER;
×
857
    }
858

UNCOV
859
    if (statusReq.dnodeId == 0) {
×
UNCOV
860
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
×
861
    } else {
UNCOV
862
      if (statusReq.clusterId != pMnode->clusterId) {
×
863
        if (pDnode != NULL) {
×
864
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
865
        }
866
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
867
               pMnode->clusterId);
868
        terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
869
        goto _OVER;
×
870
      }
871
    }
872

873
    // Verify whether the cluster parameters are consistent when status change from offline to ready
UNCOV
874
    pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
×
UNCOV
875
    if (pDnode->offlineReason != 0) {
×
876
      mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
×
877
      if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
×
878
      goto _OVER;
×
879
    }
880

UNCOV
881
    if (!online) {
×
UNCOV
882
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
×
883
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
884
    } else {
UNCOV
885
      mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
×
886
            statusReq.dnodeVer, dnodeVer, reboot);
887
    }
888

UNCOV
889
    pDnode->rebootTime = statusReq.rebootTime;
×
UNCOV
890
    pDnode->numOfCores = statusReq.numOfCores;
×
UNCOV
891
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
×
UNCOV
892
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
×
UNCOV
893
    pDnode->memAvail = statusReq.memAvail;
×
UNCOV
894
    pDnode->memTotal = statusReq.memTotal;
×
UNCOV
895
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
×
UNCOV
896
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
×
UNCOV
897
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
×
UNCOV
898
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
×
UNCOV
899
      if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
×
900
        goto _OVER;
×
901
      }
902
    }
903

UNCOV
904
    SStatusRsp statusRsp = {0};
×
UNCOV
905
    statusRsp.statusSeq++;
×
UNCOV
906
    statusRsp.analVer = analVer;
×
UNCOV
907
    statusRsp.dnodeVer = dnodeVer;
×
UNCOV
908
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
×
UNCOV
909
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
×
UNCOV
910
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
×
UNCOV
911
    if (statusRsp.pDnodeEps == NULL) {
×
912
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
913
      goto _OVER;
×
914
    }
915

UNCOV
916
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
×
UNCOV
917
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
×
918

UNCOV
919
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
×
UNCOV
920
    void   *pHead = rpcMallocCont(contLen);
×
UNCOV
921
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
×
UNCOV
922
    taosArrayDestroy(statusRsp.pDnodeEps);
×
UNCOV
923
    if (contLen < 0) {
×
924
      code = contLen;
×
925
      goto _OVER;
×
926
    }
927

UNCOV
928
    pReq->info.rspLen = contLen;
×
UNCOV
929
    pReq->info.rsp = pHead;
×
930
  }
931

UNCOV
932
  pDnode->accessTimes++;
×
UNCOV
933
  pDnode->lastAccessTime = curMs;
×
UNCOV
934
  code = 0;
×
935

UNCOV
936
_OVER:
×
UNCOV
937
  mndReleaseDnode(pMnode, pDnode);
×
UNCOV
938
  taosArrayDestroy(statusReq.pVloads);
×
UNCOV
939
  return mndUpdClusterInfo(pReq);
×
940
}
941

942
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
943
  SMnode    *pMnode = pReq->info.node;
×
944
  SNotifyReq notifyReq = {0};
×
945
  int32_t    code = 0;
×
946

947
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
948
    terrno = code;
×
949
    goto _OVER;
×
950
  }
951

952
  int64_t clusterid = mndGetClusterId(pMnode);
×
953
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
954
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
955
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
956
          notifyReq.clusterId, clusterid, tstrerror(code));
957
    goto _OVER;
×
958
  }
959

960
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
961
  for (int32_t v = 0; v < nVgroup; ++v) {
×
962
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
963

964
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
965
    if (pVgroup != NULL) {
×
966
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
967
      mndReleaseVgroup(pMnode, pVgroup);
×
968
    }
969
  }
970
  code = mndUpdClusterInfo(pReq);
×
971
_OVER:
×
972
  tFreeSNotifyReq(&notifyReq);
×
973
  return code;
×
974
}
975

UNCOV
976
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
×
UNCOV
977
  int32_t  code = -1;
×
UNCOV
978
  SSdbRaw *pRaw = NULL;
×
UNCOV
979
  STrans  *pTrans = NULL;
×
980

UNCOV
981
  SDnodeObj dnodeObj = {0};
×
UNCOV
982
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
×
UNCOV
983
  dnodeObj.createdTime = taosGetTimestampMs();
×
UNCOV
984
  dnodeObj.updateTime = dnodeObj.createdTime;
×
UNCOV
985
  dnodeObj.port = pCreate->port;
×
UNCOV
986
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
×
UNCOV
987
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
×
988

UNCOV
989
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
×
UNCOV
990
  if (pTrans == NULL) {
×
991
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
992
    if (terrno != 0) code = terrno;
×
993
    goto _OVER;
×
994
  }
UNCOV
995
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
×
UNCOV
996
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
997

UNCOV
998
  pRaw = mndDnodeActionEncode(&dnodeObj);
×
UNCOV
999
  if (pRaw == NULL) {
×
1000
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1001
    if (terrno != 0) code = terrno;
×
1002
    goto _OVER;
×
1003
  }
UNCOV
1004
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
×
UNCOV
1005
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
×
UNCOV
1006
  pRaw = NULL;
×
1007

UNCOV
1008
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
UNCOV
1009
  code = 0;
×
1010

UNCOV
1011
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD,
×
1012
                                   1);  // TODO: check the return value
UNCOV
1013
_OVER:
×
UNCOV
1014
  mndTransDrop(pTrans);
×
UNCOV
1015
  sdbFreeRaw(pRaw);
×
UNCOV
1016
  return code;
×
1017
}
1018

UNCOV
1019
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
×
UNCOV
1020
  SMnode       *pMnode = pReq->info.node;
×
UNCOV
1021
  SSdb         *pSdb = pMnode->pSdb;
×
UNCOV
1022
  SDnodeObj    *pObj = NULL;
×
UNCOV
1023
  void         *pIter = NULL;
×
UNCOV
1024
  SDnodeListRsp rsp = {0};
×
UNCOV
1025
  int32_t       code = -1;
×
1026

UNCOV
1027
  rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
×
UNCOV
1028
  if (NULL == rsp.dnodeList) {
×
1029
    mError("failed to alloc epSet while process dnode list req");
×
1030
    code = terrno;
×
1031
    goto _OVER;
×
1032
  }
1033

UNCOV
1034
  while (1) {
×
UNCOV
1035
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
UNCOV
1036
    if (pIter == NULL) break;
×
1037

UNCOV
1038
    SEpSet epSet = {0};
×
UNCOV
1039
    epSet.numOfEps = 1;
×
UNCOV
1040
    tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
×
UNCOV
1041
    epSet.eps[0].port = pObj->port;
×
1042

UNCOV
1043
    if (taosArrayPush(rsp.dnodeList, &epSet) == NULL) {
×
1044
      if (terrno != 0) code = terrno;
×
1045
      sdbRelease(pSdb, pObj);
×
1046
      sdbCancelFetch(pSdb, pIter);
×
1047
      goto _OVER;
×
1048
    }
1049

UNCOV
1050
    sdbRelease(pSdb, pObj);
×
1051
  }
1052

UNCOV
1053
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
×
UNCOV
1054
  void   *pRsp = rpcMallocCont(rspLen);
×
UNCOV
1055
  if (pRsp == NULL) {
×
1056
    code = terrno;
×
1057
    goto _OVER;
×
1058
  }
1059

UNCOV
1060
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
×
1061
    code = rspLen;
×
1062
    goto _OVER;
×
1063
  }
1064

UNCOV
1065
  pReq->info.rspLen = rspLen;
×
UNCOV
1066
  pReq->info.rsp = pRsp;
×
UNCOV
1067
  code = 0;
×
1068

UNCOV
1069
_OVER:
×
1070

UNCOV
1071
  if (code != 0) {
×
1072
    mError("failed to get dnode list since %s", tstrerror(code));
×
1073
  }
1074

UNCOV
1075
  tFreeSDnodeListRsp(&rsp);
×
1076

UNCOV
1077
  TAOS_RETURN(code);
×
1078
}
1079

UNCOV
1080
void getSlowLogScopeString(int32_t scope, char *result) {
×
UNCOV
1081
  if (scope == SLOW_LOG_TYPE_NULL) {
×
1082
    (void)strncat(result, "NONE", 64);
×
1083
    return;
×
1084
  }
UNCOV
1085
  while (scope > 0) {
×
UNCOV
1086
    if (scope & SLOW_LOG_TYPE_QUERY) {
×
UNCOV
1087
      (void)strncat(result, "QUERY", 64);
×
UNCOV
1088
      scope &= ~SLOW_LOG_TYPE_QUERY;
×
UNCOV
1089
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
UNCOV
1090
      (void)strncat(result, "INSERT", 64);
×
UNCOV
1091
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
UNCOV
1092
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
UNCOV
1093
      (void)strncat(result, "OTHERS", 64);
×
UNCOV
1094
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1095
    } else {
1096
      (void)printf("invalid slow log scope:%d", scope);
×
1097
      return;
×
1098
    }
1099

UNCOV
1100
    if (scope > 0) {
×
UNCOV
1101
      (void)strncat(result, "|", 64);
×
1102
    }
1103
  }
1104
}
1105

UNCOV
1106
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
×
UNCOV
1107
  SMnode         *pMnode = pReq->info.node;
×
UNCOV
1108
  int32_t         code = -1;
×
UNCOV
1109
  SDnodeObj      *pDnode = NULL;
×
UNCOV
1110
  SCreateDnodeReq createReq = {0};
×
1111

UNCOV
1112
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
×
1113
    goto _OVER;
×
1114
  }
1115

UNCOV
1116
  TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
1117

UNCOV
1118
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
×
UNCOV
1119
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
×
1120

UNCOV
1121
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
×
1122
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1123
    goto _OVER;
×
1124
  }
1125

1126
  char ep[TSDB_EP_LEN];
UNCOV
1127
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
×
UNCOV
1128
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
UNCOV
1129
  if (pDnode != NULL) {
×
1130
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1131
    goto _OVER;
×
1132
  }
1133

UNCOV
1134
  code = mndCreateDnode(pMnode, pReq, &createReq);
×
UNCOV
1135
  if (code == 0) {
×
UNCOV
1136
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
UNCOV
1137
    tsGrantHBInterval = 5;
×
1138
  }
1139

UNCOV
1140
  char obj[200] = {0};
×
UNCOV
1141
  (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
×
1142

UNCOV
1143
  auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
×
1144

UNCOV
1145
_OVER:
×
UNCOV
1146
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1147
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1148
  }
1149

UNCOV
1150
  mndReleaseDnode(pMnode, pDnode);
×
UNCOV
1151
  tFreeSCreateDnodeReq(&createReq);
×
UNCOV
1152
  TAOS_RETURN(code);
×
1153
}
1154

1155
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1156

UNCOV
1157
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
×
1158

1159
#ifndef TD_ENTERPRISE
1160
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1161
#endif
1162

UNCOV
1163
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
×
1164
                            SSnodeObj *pSObj, int32_t numOfVnodes, bool force, bool unsafe) {
UNCOV
1165
  int32_t  code = -1;
×
UNCOV
1166
  SSdbRaw *pRaw = NULL;
×
UNCOV
1167
  STrans  *pTrans = NULL;
×
1168

UNCOV
1169
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
×
UNCOV
1170
  if (pTrans == NULL) {
×
1171
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1172
    if (terrno != 0) code = terrno;
×
1173
    goto _OVER;
×
1174
  }
UNCOV
1175
  mndTransSetSerial(pTrans);
×
UNCOV
1176
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
×
UNCOV
1177
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1178

UNCOV
1179
  pRaw = mndDnodeActionEncode(pDnode);
×
UNCOV
1180
  if (pRaw == NULL) {
×
1181
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1182
    if (terrno != 0) code = terrno;
×
1183
    goto _OVER;
×
1184
  }
UNCOV
1185
  TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRaw), NULL, _OVER);
×
UNCOV
1186
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), NULL, _OVER);
×
UNCOV
1187
  pRaw = NULL;
×
1188

UNCOV
1189
  pRaw = mndDnodeActionEncode(pDnode);
×
UNCOV
1190
  if (pRaw == NULL) {
×
1191
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1192
    if (terrno != 0) code = terrno;
×
1193
    goto _OVER;
×
1194
  }
UNCOV
1195
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
×
UNCOV
1196
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), NULL, _OVER);
×
UNCOV
1197
  pRaw = NULL;
×
1198

UNCOV
1199
  if (pMObj != NULL) {
×
UNCOV
1200
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
UNCOV
1201
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), NULL, _OVER);
×
1202
  }
1203

UNCOV
1204
  if (pQObj != NULL) {
×
UNCOV
1205
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
UNCOV
1206
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), NULL, _OVER);
×
1207
  }
1208

UNCOV
1209
  if (pSObj != NULL) {
×
UNCOV
1210
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
×
UNCOV
1211
    TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), NULL, _OVER);
×
1212
  }
1213

UNCOV
1214
  if (numOfVnodes > 0) {
×
UNCOV
1215
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
×
UNCOV
1216
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
×
1217
  }
1218

UNCOV
1219
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1220

UNCOV
1221
  (void)mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP,
×
1222
                                   1);  // TODO: check the return value
UNCOV
1223
  code = 0;
×
1224

UNCOV
1225
_OVER:
×
UNCOV
1226
  mndTransDrop(pTrans);
×
UNCOV
1227
  sdbFreeRaw(pRaw);
×
UNCOV
1228
  TAOS_RETURN(code);
×
1229
}
1230

UNCOV
1231
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
UNCOV
1232
  bool       isEmpty = false;
×
UNCOV
1233
  SMnodeObj *pMObj = NULL;
×
UNCOV
1234
  SQnodeObj *pQObj = NULL;
×
UNCOV
1235
  SSnodeObj *pSObj = NULL;
×
1236

UNCOV
1237
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
UNCOV
1238
  if (pQObj) goto _OVER;
×
1239

UNCOV
1240
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
UNCOV
1241
  if (pSObj) goto _OVER;
×
1242

UNCOV
1243
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
UNCOV
1244
  if (pMObj) goto _OVER;
×
1245

UNCOV
1246
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
UNCOV
1247
  if (numOfVnodes > 0) goto _OVER;
×
1248

UNCOV
1249
  isEmpty = true;
×
UNCOV
1250
_OVER:
×
UNCOV
1251
  mndReleaseMnode(pMnode, pMObj);
×
UNCOV
1252
  mndReleaseQnode(pMnode, pQObj);
×
UNCOV
1253
  mndReleaseSnode(pMnode, pSObj);
×
UNCOV
1254
  return isEmpty;
×
1255
}
1256

UNCOV
1257
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
×
UNCOV
1258
  SMnode       *pMnode = pReq->info.node;
×
UNCOV
1259
  int32_t       code = -1;
×
UNCOV
1260
  SDnodeObj    *pDnode = NULL;
×
UNCOV
1261
  SMnodeObj    *pMObj = NULL;
×
UNCOV
1262
  SQnodeObj    *pQObj = NULL;
×
UNCOV
1263
  SSnodeObj    *pSObj = NULL;
×
UNCOV
1264
  SDropDnodeReq dropReq = {0};
×
1265

UNCOV
1266
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
1267

UNCOV
1268
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
×
1269
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
UNCOV
1270
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
×
1271

UNCOV
1272
  bool force = dropReq.force;
×
UNCOV
1273
  if (dropReq.unsafe) {
×
UNCOV
1274
    force = true;
×
1275
  }
1276

UNCOV
1277
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
×
UNCOV
1278
  if (pDnode == NULL) {
×
UNCOV
1279
    int32_t err = terrno;
×
UNCOV
1280
    char    ep[TSDB_EP_LEN + 1] = {0};
×
UNCOV
1281
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
UNCOV
1282
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
UNCOV
1283
    if (pDnode == NULL) {
×
UNCOV
1284
      code = err;
×
UNCOV
1285
      goto _OVER;
×
1286
    }
1287
  }
1288

UNCOV
1289
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
×
UNCOV
1290
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
×
UNCOV
1291
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
×
UNCOV
1292
  if (pMObj != NULL) {
×
UNCOV
1293
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
×
UNCOV
1294
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
×
UNCOV
1295
      goto _OVER;
×
1296
    }
UNCOV
1297
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
×
UNCOV
1298
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
×
UNCOV
1299
      goto _OVER;
×
1300
    }
1301
  }
1302

UNCOV
1303
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
UNCOV
1304
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
1305

UNCOV
1306
  if (isonline && force) {
×
1307
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1308
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1309
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1310
    goto _OVER;
×
1311
  }
1312

UNCOV
1313
  bool isEmpty = mndIsEmptyDnode(pMnode, pDnode->id);
×
UNCOV
1314
  if (!isonline && !force && !isEmpty) {
×
UNCOV
1315
    code = TSDB_CODE_DNODE_OFFLINE;
×
UNCOV
1316
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
×
1317
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
UNCOV
1318
    goto _OVER;
×
1319
  }
1320

UNCOV
1321
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
×
UNCOV
1322
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1323

UNCOV
1324
  char obj1[30] = {0};
×
UNCOV
1325
  (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
×
1326

UNCOV
1327
  auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
×
1328

UNCOV
1329
_OVER:
×
UNCOV
1330
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1331
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1332
  }
1333

UNCOV
1334
  mndReleaseDnode(pMnode, pDnode);
×
UNCOV
1335
  mndReleaseMnode(pMnode, pMObj);
×
UNCOV
1336
  mndReleaseQnode(pMnode, pQObj);
×
UNCOV
1337
  mndReleaseSnode(pMnode, pSObj);
×
UNCOV
1338
  tFreeSDropDnodeReq(&dropReq);
×
UNCOV
1339
  TAOS_RETURN(code);
×
1340
}
1341

1342
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
×
1343
  int32_t code = 0;
×
1344
  SMnode *pMnode = pReq->info.node;
×
1345
  SSdb   *pSdb = pMnode->pSdb;
×
1346
  void   *pIter = NULL;
×
1347
  int8_t  encrypting = 0;
×
1348

1349
  const STraceId *trace = &pReq->info.traceId;
×
1350

1351
  int32_t klen = strlen(pDcfgReq->value);
×
1352
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
1353
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1354
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1355
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1356
    goto _exit;
×
1357
  }
1358

1359
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
×
1360
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1361
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1362
    goto _exit;
×
1363
  }
1364

1365
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
×
1366
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1367
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1368
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1369
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1370
    goto _exit;
×
1371
  }
1372

1373
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
×
1374
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
×
1375
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
×
1376

1377
  while (1) {
×
1378
    SDnodeObj *pDnode = NULL;
×
1379
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
1380
    if (pIter == NULL) break;
×
1381
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
1382
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1383
             offlineReason[pDnode->offlineReason]);
1384
      sdbRelease(pSdb, pDnode);
×
1385
      continue;
×
1386
    }
1387

1388
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
×
1389
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
1390
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
×
1391
      void   *pBuf = rpcMallocCont(bufLen);
×
1392

1393
      if (pBuf != NULL) {
×
1394
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
×
1395
          code = bufLen;
×
1396
          sdbRelease(pSdb, pDnode);
×
1397
          goto _exit;
×
1398
        }
1399
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
1400
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
×
1401
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
×
1402
        }
1403
      }
1404
    }
1405

1406
    sdbRelease(pSdb, pDnode);
×
1407
  }
1408

1409
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
×
1410
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1411
  }
1412

1413
_exit:
×
1414
  if (code != 0) {
×
1415
    if (terrno == 0) terrno = code;
×
1416
  }
1417
  return code;
×
1418
}
1419

1420
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
×
1421
  int32_t code = 0;
×
1422

1423
#ifdef TD_ENTERPRISE
1424
  SMnode       *pMnode = pReq->info.node;
×
1425
  SMCfgDnodeReq cfgReq = {0};
×
1426
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
×
1427

1428
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
×
1429
    tFreeSMCfgDnodeReq(&cfgReq);
×
1430
    TAOS_RETURN(code);
×
1431
  }
1432
  const STraceId *trace = &pReq->info.traceId;
×
1433
  SDCfgDnodeReq   dcfgReq = {0};
×
1434
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
×
1435
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
×
1436
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
×
1437
    tFreeSMCfgDnodeReq(&cfgReq);
×
1438
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
×
1439
  } else {
1440
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
1441
    tFreeSMCfgDnodeReq(&cfgReq);
×
1442
    TAOS_RETURN(code);
×
1443
  }
1444

1445
#else
1446
  TAOS_RETURN(code);
1447
#endif
1448
}
1449

1450
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
×
1451
  SMnode *pMnode = pRsp->info.node;
×
1452
  int16_t nSuccess = 0;
×
1453
  int16_t nFailed = 0;
×
1454

1455
  if (0 == pRsp->code) {
×
1456
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
×
1457
  } else {
1458
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1459
  }
1460

1461
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
×
1462
  bool    finished = nSuccess + nFailed >= nReq;
×
1463

1464
  if (finished) {
×
1465
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1466
  }
1467

1468
  const STraceId *trace = &pRsp->info.traceId;
×
1469
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
×
1470
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1471

1472
  return 0;
×
1473
}
1474

UNCOV
1475
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
1476
  SMnode *pMnode = pReq->info.node;
×
UNCOV
1477
  int32_t totalRows = 0;
×
UNCOV
1478
  int32_t numOfRows = 0;
×
UNCOV
1479
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
×
UNCOV
1480
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
×
UNCOV
1481
  char   *pWrite = NULL;
×
UNCOV
1482
  int32_t cols = 0;
×
UNCOV
1483
  int32_t code = 0;
×
UNCOV
1484
  int32_t lino = 0;
×
1485

UNCOV
1486
  cfgOpts[totalRows] = "statusInterval";
×
UNCOV
1487
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
×
UNCOV
1488
  totalRows++;
×
1489

UNCOV
1490
  cfgOpts[totalRows] = "timezone";
×
UNCOV
1491
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
×
UNCOV
1492
  totalRows++;
×
1493

UNCOV
1494
  cfgOpts[totalRows] = "locale";
×
UNCOV
1495
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
×
UNCOV
1496
  totalRows++;
×
1497

UNCOV
1498
  cfgOpts[totalRows] = "charset";
×
UNCOV
1499
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
×
UNCOV
1500
  totalRows++;
×
1501

UNCOV
1502
  cfgOpts[totalRows] = "monitor";
×
UNCOV
1503
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
×
UNCOV
1504
  totalRows++;
×
1505

UNCOV
1506
  cfgOpts[totalRows] = "monitorInterval";
×
UNCOV
1507
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
×
UNCOV
1508
  totalRows++;
×
1509

UNCOV
1510
  cfgOpts[totalRows] = "slowLogThreshold";
×
UNCOV
1511
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
×
UNCOV
1512
  totalRows++;
×
1513

UNCOV
1514
  cfgOpts[totalRows] = "slowLogMaxLen";
×
UNCOV
1515
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
×
UNCOV
1516
  totalRows++;
×
1517

UNCOV
1518
  char scopeStr[64] = {0};
×
UNCOV
1519
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
×
UNCOV
1520
  cfgOpts[totalRows] = "slowLogScope";
×
UNCOV
1521
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
×
UNCOV
1522
  totalRows++;
×
1523

UNCOV
1524
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1525
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
×
1526

UNCOV
1527
  for (int32_t i = 0; i < totalRows; i++) {
×
UNCOV
1528
    cols = 0;
×
1529

UNCOV
1530
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
×
UNCOV
1531
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1532
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
×
1533

UNCOV
1534
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
×
UNCOV
1535
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1536
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
×
1537

UNCOV
1538
    numOfRows++;
×
1539
  }
1540

UNCOV
1541
_OVER:
×
UNCOV
1542
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
×
UNCOV
1543
  pShow->numOfRows += numOfRows;
×
UNCOV
1544
  return numOfRows;
×
1545
}
1546

1547
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1548

UNCOV
1549
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
1550
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
1551
  SSdb      *pSdb = pMnode->pSdb;
×
UNCOV
1552
  int32_t    numOfRows = 0;
×
UNCOV
1553
  int32_t    cols = 0;
×
UNCOV
1554
  ESdbStatus objStatus = 0;
×
UNCOV
1555
  SDnodeObj *pDnode = NULL;
×
UNCOV
1556
  int64_t    curMs = taosGetTimestampMs();
×
1557
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
UNCOV
1558
  int32_t    code = 0;
×
UNCOV
1559
  int32_t    lino = 0;
×
1560

UNCOV
1561
  while (numOfRows < rows) {
×
UNCOV
1562
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
×
UNCOV
1563
    if (pShow->pIter == NULL) break;
×
UNCOV
1564
    bool online = mndIsDnodeOnline(pDnode, curMs);
×
1565

UNCOV
1566
    cols = 0;
×
1567

UNCOV
1568
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1569
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
×
1570

UNCOV
1571
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
×
1572

UNCOV
1573
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1574
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1575

UNCOV
1576
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1577
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
×
UNCOV
1578
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
×
1579

UNCOV
1580
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1581
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
×
1582
                        &lino, _OVER);
1583

UNCOV
1584
    const char *status = "ready";
×
UNCOV
1585
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
×
UNCOV
1586
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
×
UNCOV
1587
    if (!online) {
×
UNCOV
1588
      if (objStatus == SDB_STATUS_CREATING)
×
1589
        status = "creating*";
×
UNCOV
1590
      else if (objStatus == SDB_STATUS_DROPPING)
×
1591
        status = "dropping*";
×
1592
      else
UNCOV
1593
        status = "offline";
×
1594
    }
1595

UNCOV
1596
    STR_TO_VARSTR(buf, status);
×
UNCOV
1597
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1598
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1599

UNCOV
1600
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1601
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
×
1602
                        _OVER);
1603

UNCOV
1604
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1605
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
×
1606
                        _OVER);
1607

UNCOV
1608
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
×
UNCOV
1609
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
×
1610

UNCOV
1611
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1612
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
×
UNCOV
1613
    taosMemoryFreeClear(b);
×
1614

1615
#ifdef TD_ENTERPRISE
UNCOV
1616
    STR_TO_VARSTR(buf, pDnode->machineId);
×
UNCOV
1617
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1618
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
×
1619
#endif
1620

UNCOV
1621
    numOfRows++;
×
UNCOV
1622
    sdbRelease(pSdb, pDnode);
×
1623
  }
1624

UNCOV
1625
_OVER:
×
UNCOV
1626
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
×
1627

UNCOV
1628
  pShow->numOfRows += numOfRows;
×
UNCOV
1629
  return numOfRows;
×
1630
}
1631

1632
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1633
  SSdb *pSdb = pMnode->pSdb;
×
1634
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
1635
}
×
1636

UNCOV
1637
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
UNCOV
1638
  SDnodeObj *pObj = NULL;
×
UNCOV
1639
  void      *pIter = NULL;
×
UNCOV
1640
  SSdb      *pSdb = pMnode->pSdb;
×
UNCOV
1641
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
UNCOV
1642
  while (1) {
×
UNCOV
1643
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
UNCOV
1644
    if (pIter == NULL) break;
×
1645

UNCOV
1646
    char *fqdn = taosStrdup(pObj->fqdn);
×
UNCOV
1647
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
1648
      mError("failed to fqdn into array, but continue at this time");
×
1649
    }
UNCOV
1650
    sdbRelease(pSdb, pObj);
×
1651
  }
UNCOV
1652
  return fqdns;
×
1653
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc