• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5024

16 Apr 2026 10:31AM UTC coverage: 72.954% (+0.7%) from 72.251%
#5024

push

travis-ci

web-flow
merge: from main to 3.0 branch #35157

120 of 200 new or added lines in 14 files covered. (60.0%)

655 existing lines in 130 files now uncovered.

273150 of 374416 relevant lines covered (72.95%)

129604715.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.54
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndToken.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "taos_monitor.h"
34
#include "tconfig.h"
35
#include "tjson.h"
36
#include "tmisce.h"
37
#include "tunit.h"
38
#if defined(TD_ENTERPRISE)
39
#include "taoskInt.h"
40
#endif
41

42
#define TSDB_DNODE_VER_NUMBER   2
43
#define TSDB_DNODE_RESERVE_SIZE 40
44

45
static const char *offlineReason[] = {
46
    "",
47
    "status msg timeout",
48
    "status not received",
49
    "version not match",
50
    "dnodeId not match",
51
    "clusterId not match",
52
    "statusInterval not match",
53
    "timezone not match",
54
    "locale not match",
55
    "charset not match",
56
    "ttlChangeOnWrite not match",
57
    "enableWhiteList not match",
58
    "encryptionKey not match",
59
    "monitor not match",
60
    "monitor switch not match",
61
    "monitor interval not match",
62
    "monitor slow log threshold not match",
63
    "monitor slow log sql max len not match",
64
    "monitor slow log scope not match",
65
    "unknown",
66
};
67

68
enum {
69
  DND_ACTIVE_CODE,
70
  DND_CONN_ACTIVE_CODE,
71
};
72

73
enum {
74
  DND_CREATE,
75
  DND_ADD,
76
  DND_DROP,
77
};
78

79
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
80
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
81
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
82
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
83
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
84
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
85
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
86

87
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
89
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
90
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
91
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
92
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
93
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
94
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
95
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
96
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
97
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
98

99
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
100
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
101
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
102
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
103

104
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq);
105
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq);
106
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
107
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
108
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq);
109
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq);
110

111
#ifdef _GRANT
112
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
113
#else
114
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
115
#endif
116

117
int32_t mndInitDnode(SMnode *pMnode) {
468,429✔
118
  SSdbTable table = {
468,429✔
119
      .sdbType = SDB_DNODE,
120
      .keyType = SDB_KEY_INT32,
121
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
122
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
123
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
124
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
125
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
126
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
127
  };
128

129
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
468,429✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
468,429✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
468,429✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
468,429✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
468,429✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
468,429✔
135
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
468,429✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
468,429✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
468,429✔
138
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
468,429✔
139
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
468,429✔
140
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
468,429✔
141
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
468,429✔
142
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
468,429✔
143

144
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
468,429✔
145
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
468,429✔
146
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
468,429✔
147
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
468,429✔
148

149
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC, mndProcessKeySyncReq);
468,429✔
150
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC_RSP, mndProcessKeySyncRsp);
468,429✔
151
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_ENCRYPT_KEY, mndProcessAlterEncryptKeyReq);
468,429✔
152
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_KEY_EXPIRATION, mndProcessAlterKeyExpirationReq);
468,429✔
153

154
  return sdbSetTable(pMnode->pSdb, table);
468,429✔
155
}
156

157
void mndCleanupDnode(SMnode *pMnode) {}
468,366✔
158

159
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
338,249✔
160
  int32_t  code = -1;
338,249✔
161
  SSdbRaw *pRaw = NULL;
338,249✔
162
  STrans  *pTrans = NULL;
338,249✔
163

164
  SDnodeObj dnodeObj = {0};
338,249✔
165
  dnodeObj.id = 1;
338,249✔
166
  dnodeObj.createdTime = taosGetTimestampMs();
338,249✔
167
  dnodeObj.updateTime = dnodeObj.createdTime;
338,249✔
168
  dnodeObj.port = tsServerPort;
338,249✔
169
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
338,249✔
170
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
338,249✔
171
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
338,249✔
172
  char *machineId = NULL;
338,249✔
173
  code = tGetMachineId(&machineId);
338,249✔
174
  if (machineId) {
338,249✔
175
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
338,249✔
176
    taosMemoryFreeClear(machineId);
338,249✔
177
  } else {
178
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
179
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
180
    goto _OVER;
×
181
#endif
182
  }
183

184
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
338,249✔
185
  if (pTrans == NULL) {
338,249✔
186
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
187
    if (terrno != 0) code = terrno;
×
188
    goto _OVER;
×
189
  }
190
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
338,249✔
191

192
  pRaw = mndDnodeActionEncode(&dnodeObj);
338,249✔
193
  if (pRaw == NULL) {
338,249✔
194
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
195
    if (terrno != 0) code = terrno;
×
196
    goto _OVER;
×
197
  }
198
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
338,249✔
199
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
338,249✔
200
  pRaw = NULL;
338,249✔
201

202
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
338,249✔
203
  code = 0;
338,249✔
204

205
_OVER:
338,249✔
206
  mndTransDrop(pTrans);
338,249✔
207
  sdbFreeRaw(pRaw);
338,249✔
208
  return code;
338,249✔
209
}
210

211
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,561,778✔
212
  int32_t code = 0;
2,561,778✔
213
  int32_t lino = 0;
2,561,778✔
214
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,561,778✔
215

216
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,561,778✔
217
  if (pRaw == NULL) goto _OVER;
2,561,778✔
218

219
  int32_t dataPos = 0;
2,561,778✔
220
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,561,778✔
221
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,561,778✔
222
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,561,778✔
223
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,561,778✔
224
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,561,778✔
225
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,561,778✔
226
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,561,778✔
227
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,561,778✔
228
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,561,778✔
229
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,561,778✔
230

231
  terrno = 0;
2,561,778✔
232

233
_OVER:
2,561,778✔
234
  if (terrno != 0) {
2,561,778✔
235
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
236
    sdbFreeRaw(pRaw);
×
237
    return NULL;
×
238
  }
239

240
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,561,778✔
241
  return pRaw;
2,561,778✔
242
}
243

244
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
1,692,613✔
245
  int32_t code = 0;
1,692,613✔
246
  int32_t lino = 0;
1,692,613✔
247
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,692,613✔
248
  SSdbRow   *pRow = NULL;
1,692,613✔
249
  SDnodeObj *pDnode = NULL;
1,692,613✔
250

251
  int8_t sver = 0;
1,692,613✔
252
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,692,613✔
253
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
1,692,613✔
254
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
255
    goto _OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SDnodeObj));
1,692,613✔
259
  if (pRow == NULL) goto _OVER;
1,692,613✔
260

261
  pDnode = sdbGetRowObj(pRow);
1,692,613✔
262
  if (pDnode == NULL) goto _OVER;
1,692,613✔
263

264
  int32_t dataPos = 0;
1,692,613✔
265
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
1,692,613✔
266
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
1,692,613✔
267
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
1,692,613✔
268
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
1,692,613✔
269
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
1,692,613✔
270
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
1,692,613✔
271
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
1,692,613✔
272
  if (sver > 1) {
1,692,613✔
273
    int16_t keyLen = 0;
1,692,613✔
274
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,692,613✔
275
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,692,613✔
276
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,692,613✔
277
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,692,613✔
278
  }
279

280
  terrno = 0;
1,692,613✔
281
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
1,692,613✔
282
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
283
  }
284

285
_OVER:
1,692,613✔
286
  if (terrno != 0) {
1,692,613✔
287
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
1,692,613✔
293
  return pRow;
1,692,613✔
294
}
295

296
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
813,095✔
297
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
813,095✔
298
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
813,095✔
299

300
  char ep[TSDB_EP_LEN] = {0};
813,095✔
301
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
813,095✔
302
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
813,095✔
303
  return 0;
813,095✔
304
}
305

306
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
1,692,566✔
307
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
1,692,566✔
308
  return 0;
1,692,566✔
309
}
310

311
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
869,908✔
312
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
869,908✔
313
  pOld->updateTime = pNew->updateTime;
869,908✔
314
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
315
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
869,908✔
316
#endif
317
  return 0;
869,908✔
318
}
319

320
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
185,155,802✔
321
  SSdb      *pSdb = pMnode->pSdb;
185,155,802✔
322
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
185,155,802✔
323
  if (pDnode == NULL) {
185,155,724✔
324
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
229,187✔
325
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
17,882✔
326
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
211,305✔
327
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
328
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
211,305✔
329
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
211,305✔
330
    } else {
331
      terrno = TSDB_CODE_APP_ERROR;
×
332
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
333
    }
334
  }
335

336
  return pDnode;
185,155,615✔
337
}
338

339
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
186,394,687✔
340
  SSdb *pSdb = pMnode->pSdb;
186,394,687✔
341
  sdbRelease(pSdb, pDnode);
186,394,796✔
342
}
186,394,796✔
343

344
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
10,657,155✔
345
  SEpSet epSet = {0};
10,657,155✔
346
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
10,657,155✔
347
  return epSet;
10,657,155✔
348
}
349

350
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
1,093,206✔
351
  SEpSet     epSet = {0};
1,093,206✔
352
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,093,206✔
353
  if (!pDnode) return epSet;
1,093,206✔
354

355
  epSet = mndGetDnodeEpset(pDnode);
1,093,206✔
356

357
  mndReleaseDnode(pMnode, pDnode);
1,093,206✔
358
  return epSet;
1,093,206✔
359
}
360

361
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,453,774✔
362
  SSdb *pSdb = pMnode->pSdb;
1,453,774✔
363

364
  void *pIter = NULL;
1,453,774✔
365
  while (1) {
2,408,059✔
366
    SDnodeObj *pDnode = NULL;
3,861,833✔
367
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
3,861,833✔
368
    if (pIter == NULL) break;
3,861,833✔
369

370
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
3,062,496✔
371
      sdbCancelFetch(pSdb, pIter);
654,437✔
372
      return pDnode;
654,437✔
373
    }
374

375
    sdbRelease(pSdb, pDnode);
2,408,059✔
376
  }
377

378
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
799,337✔
379
  return NULL;
799,337✔
380
}
381

382
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
160,298✔
383
  SSdb *pSdb = pMnode->pSdb;
160,298✔
384

385
  void *pIter = NULL;
160,298✔
386
  while (1) {
170,273✔
387
    SDnodeObj *pDnode = NULL;
330,571✔
388
    ESdbStatus objStatus = 0;
330,571✔
389
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
330,571✔
390
    if (pIter == NULL) break;
330,571✔
391

392
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
330,571✔
393
      sdbCancelFetch(pSdb, pIter);
160,298✔
394
      return pDnode;
160,298✔
395
    }
396

397
    sdbRelease(pSdb, pDnode);
170,273✔
398
  }
399

400
  return NULL;
×
401
}
402

403
int32_t mndGetDnodeSize(SMnode *pMnode) {
90,769,523✔
404
  SSdb *pSdb = pMnode->pSdb;
90,769,523✔
405
  return sdbGetSize(pSdb, SDB_DNODE);
90,770,056✔
406
}
407

408
int32_t mndGetDbSize(SMnode *pMnode) {
×
409
  SSdb *pSdb = pMnode->pSdb;
×
410
  return sdbGetSize(pSdb, SDB_DB);
×
411
}
412

413
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
112,099,681✔
414
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
112,099,681✔
415
  if (interval > (int64_t)tsStatusTimeoutMs) {
112,097,393✔
416
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,387,583✔
417
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
46,617✔
418
    }
419
    return false;
2,387,583✔
420
  }
421
  return true;
109,709,810✔
422
}
423

424
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,176,751✔
425
  SSdb *pSdb = pMnode->pSdb;
2,176,751✔
426

427
  int32_t numOfEps = 0;
2,176,751✔
428
  void   *pIter = NULL;
2,176,751✔
429
  while (1) {
6,687,526✔
430
    SDnodeObj *pDnode = NULL;
8,864,277✔
431
    ESdbStatus objStatus = 0;
8,864,277✔
432
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
8,864,277✔
433
    if (pIter == NULL) break;
8,864,277✔
434

435
    SDnodeEp dnodeEp = {0};
6,687,526✔
436
    dnodeEp.id = pDnode->id;
6,687,526✔
437
    dnodeEp.ep.port = pDnode->port;
6,687,526✔
438
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
6,687,526✔
439
    sdbRelease(pSdb, pDnode);
6,687,526✔
440

441
    dnodeEp.isMnode = 0;
6,687,526✔
442
    if (mndIsMnode(pMnode, pDnode->id)) {
6,687,526✔
443
      dnodeEp.isMnode = 1;
2,617,733✔
444
    }
445
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
6,687,526✔
446
      mError("failed to put ep into array, but continue at this call");
×
447
    }
448
  }
449
}
2,176,751✔
450

451
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
27,604,757✔
452
  SSdb   *pSdb = pMnode->pSdb;
27,604,757✔
453
  int32_t code = 0;
27,604,757✔
454

455
  int32_t numOfEps = 0;
27,604,757✔
456
  void   *pIter = NULL;
27,604,757✔
457
  while (1) {
115,454,144✔
458
    SDnodeObj *pDnode = NULL;
143,058,901✔
459
    ESdbStatus objStatus = 0;
143,058,901✔
460
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
143,058,901✔
461
    if (pIter == NULL) break;
143,058,901✔
462

463
    SDnodeInfo dInfo;
115,451,925✔
464
    dInfo.id = pDnode->id;
115,454,144✔
465
    dInfo.ep.port = pDnode->port;
115,454,144✔
466
    dInfo.offlineReason = pDnode->offlineReason;
115,454,144✔
467
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
115,454,144✔
468
    sdbRelease(pSdb, pDnode);
115,454,144✔
469
    if (mndIsMnode(pMnode, pDnode->id)) {
115,454,144✔
470
      dInfo.isMnode = 1;
31,935,577✔
471
    } else {
472
      dInfo.isMnode = 0;
83,518,567✔
473
    }
474

475
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
115,454,144✔
476
      code = terrno;
×
477
      sdbCancelFetch(pSdb, pIter);
×
478
      break;
×
479
    }
480
  }
481
  TAOS_RETURN(code);
27,604,757✔
482
}
483

484
#define CHECK_MONITOR_PARA(para, err)                                                                    \
485
  if (pCfg->monitorParas.para != para) {                                                                 \
486
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
487
    terrno = err;                                                                                        \
488
    return err;                                                                                          \
489
  }
490

491
/*
492
 * Cached timezone offset to avoid DST-transition race:
493
 * dnode computes offset at send-time; if mnode recomputes
494
 * at receive-time the value may differ during the ~1 s
495
 * DST switchover, causing a spurious timezone mismatch.
496
 *
497
 * Keep both the previous and current cached offsets so
498
 * that during a DST transition window dnodes reporting
499
 * either the old or the new offset are both accepted.
500
 * Refresh at most once per 60 s; the previous offset
501
 * is only accepted for a short grace period (3 min)
502
 * to avoid permanently weakening the check.
503
 */
504
static int64_t tsCachedTzOffset     = 0;
505
static int64_t tsCachedTzOffsetPrev = 0;
506
static int64_t tsCachedTzOffsetMs   = 0;
507
static int64_t tsCachedPrevSetMs    = 0;
508
static int8_t  tsCachedHasPrev      = 0;
509
/*
510
 * Seqlock sequence: even = stable, odd = write in
511
 * progress.  CAS from even→odd grants single-writer
512
 * access; store even+2 publishes the new snapshot.
513
 */
514
static int64_t tsTzSeq              = 0;
515
#define TZ_CACHE_REFRESH_MS  60000
516
#define TZ_PREV_GRACE_MS    180000
517

518
typedef struct {
519
  int64_t offset;
520
  int64_t offsetPrev;
521
  int64_t refreshMs;
522
  int64_t prevSetMs;
523
  int8_t  hasPrev;
524
} STzSnapshot;
525

526
/*
527
 * Read a consistent snapshot of the tz cache.
528
 * Spins while a writer is active (odd seq) or
529
 * if the snapshot was torn (seq changed).
530
 */
NEW
531
static void mndReadTzSnapshot(STzSnapshot *s) {
×
532
  int64_t seq;
533
  do {
NEW
534
    seq = atomic_load_64(&tsTzSeq);
×
NEW
535
    if (seq & 1) continue;
×
NEW
536
    s->offset     = atomic_load_64(&tsCachedTzOffset);
×
NEW
537
    s->offsetPrev = atomic_load_64(&tsCachedTzOffsetPrev);
×
NEW
538
    s->refreshMs  = atomic_load_64(&tsCachedTzOffsetMs);
×
NEW
539
    s->prevSetMs  = atomic_load_64(&tsCachedPrevSetMs);
×
NEW
540
    s->hasPrev    = atomic_load_8(&tsCachedHasPrev);
×
NEW
541
  } while (atomic_load_64(&tsTzSeq) != seq);
×
NEW
542
}
×
543

544
/*
545
 * Try to refresh the tz cache.  Uses CAS on tsTzSeq
546
 * to ensure single-writer; if another thread is
547
 * already refreshing, this is a harmless no-op.
548
 */
NEW
549
static void mndRefreshTzCache(int64_t nowMs) {
×
NEW
550
  int64_t seq = atomic_load_64(&tsTzSeq);
×
NEW
551
  if (seq & 1) return;
×
NEW
552
  if (atomic_val_compare_exchange_64(&tsTzSeq, seq, seq + 1) != seq) {
×
NEW
553
    return;
×
554
  }
555

556
  /* seq is now odd — we are the sole writer */
NEW
557
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
558
  int64_t offset = (int64_t)taosGetLocalTimezoneOffset(&code);
×
NEW
559
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
560
    mError("failed to get local timezone offset since %s", tstrerror(code));
×
561
    /* rollback: restore even seq */
NEW
562
    atomic_store_64(&tsTzSeq, seq);
×
NEW
563
    return;
×
564
  }
565

NEW
566
  int64_t oldMs = atomic_load_64(&tsCachedTzOffsetMs);
×
NEW
567
  int64_t oldOff = atomic_load_64(&tsCachedTzOffset);
×
NEW
568
  if (oldMs != 0 && offset != oldOff) {
×
569
    /* offset changed (DST edge) — keep old one */
NEW
570
    atomic_store_64(&tsCachedTzOffsetPrev, oldOff);
×
NEW
571
    atomic_store_64(&tsCachedPrevSetMs, nowMs);
×
NEW
572
    atomic_store_8(&tsCachedHasPrev, 1);
×
573
  }
NEW
574
  atomic_store_64(&tsCachedTzOffset, offset);
×
NEW
575
  atomic_store_64(&tsCachedTzOffsetMs, nowMs);
×
576

577
  /* publish: even seq+2 => readers see new state */
NEW
578
  atomic_store_64(&tsTzSeq, seq + 2);
×
579
}
580

NEW
581
static bool mndMatchTzSnapshot(const STzSnapshot *s, int64_t dnodeOff,
×
582
                               int64_t nowMs) {
NEW
583
  if (dnodeOff == s->offset) return true;
×
NEW
584
  if (s->hasPrev && nowMs - s->prevSetMs < TZ_PREV_GRACE_MS &&
×
NEW
585
      dnodeOff == s->offsetPrev) {
×
NEW
586
    return true;
×
587
  }
NEW
588
  return false;
×
589
}
590

NEW
591
static bool mndCheckTimezoneOffset(int64_t dnodeOffset) {
×
NEW
592
  STzSnapshot snap;
×
NEW
593
  int64_t     nowMs = taosGetTimestampMs();
×
594

NEW
595
  mndReadTzSnapshot(&snap);
×
596

NEW
597
  if (snap.refreshMs == 0 || nowMs - snap.refreshMs >= TZ_CACHE_REFRESH_MS) {
×
NEW
598
    mndRefreshTzCache(nowMs);
×
NEW
599
    mndReadTzSnapshot(&snap);
×
600
  }
601

NEW
602
  if (mndMatchTzSnapshot(&snap, dnodeOffset, nowMs))
×
NEW
603
    return true;
×
604

605
  /*
606
   * dnodeOffset doesn't match — force an immediate
607
   * refresh in case we are stale after a DST switch,
608
   * then re-check.
609
   */
NEW
610
  mndRefreshTzCache(nowMs);
×
NEW
611
  mndReadTzSnapshot(&snap);
×
612

NEW
613
  return mndMatchTzSnapshot(&snap, dnodeOffset, nowMs);
×
614
}
615

616
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
×
617
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
×
618
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
×
619
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
×
620
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
×
621
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
×
622

623
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
×
624
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
625
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
626
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
627
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
628
  }
629

630
  /*
631
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
632
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
633
           tsStatusIntervalMs);
634
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
635
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
636
  }
637
  */
638

NEW
639
  if (0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) {
×
NEW
640
    if (!mndCheckTimezoneOffset(pCfg->checkTime)) {
×
NEW
641
      mError("dnode:%d, timezone:%s checkTime:%" PRId64
×
642
             " inconsistent with cluster %s",
643
             pDnode->id, pCfg->timezone, pCfg->checkTime, tsTimezoneStr);
NEW
644
      terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
NEW
645
      return DND_REASON_TIME_ZONE_NOT_MATCH;
×
646
    }
647
  }
648

649
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
×
650
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
651
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
652
    return DND_REASON_LOCALE_NOT_MATCH;
×
653
  }
654

655
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
×
656
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
657
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
658
    return DND_REASON_CHARSET_NOT_MATCH;
×
659
  }
660

661
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
×
662
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
663
           tsTtlChangeOnWrite);
664
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
665
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
666
  }
667
  int8_t enable = tsEnableWhiteList ? 1 : 0;
×
668
  if (pCfg->enableWhiteList != enable) {
×
669
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
670
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
671
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
672
  }
673

674
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
×
675
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
×
676
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
677
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
678
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
679
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
680
  }
681

682
  return DND_REASON_ONLINE;
×
683
}
684

685
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
77,081✔
686
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
77,081✔
UNCOV
687
    return 0.0;
×
688
  }
689

690
  int64_t deltaCount = currentCount - lastCount;
77,081✔
691
  int64_t deltaMs = currentTimeMs - lastTimeMs;
77,081✔
692
  double  rate = (double)deltaCount / (double)deltaMs;
77,081✔
693
  return rate;
77,081✔
694
}
695

696
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
148,085,883✔
697
  bool stateChanged = false;
148,085,883✔
698
  bool roleChanged = pGid->syncState != pVload->syncState ||
148,104,609✔
699
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
290,929,703✔
700
                     pGid->roleTimeMs != pVload->roleTimeMs;
142,843,820✔
701

702
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
148,085,883✔
703
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
175,242✔
704
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
94,693✔
705
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
80,549✔
706
      int64_t currentTimeMs = taosGetTimestampMs();
77,081✔
707
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
77,081✔
708
                                          pGid->lastSyncAppliedIndexUpdateTime);
709

710
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
77,081✔
711
    }
712
  }
713

714
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
148,085,883✔
715
  pGid->syncCommitIndex = pVload->syncCommitIndex;
148,085,883✔
716
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
148,085,883✔
717
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
148,085,883✔
718
  pGid->learnerProgress = pVload->learnerProgress;
148,085,883✔
719
  pGid->snapSeq = pVload->snapSeq;
148,085,883✔
720
  pGid->syncTotalIndex = pVload->syncTotalIndex;
148,085,883✔
721
  if (pVload->snapSeq > 0 && pVload->snapSeq < SYNC_SNAPSHOT_SEQ_END || pVload->syncState == TAOS_SYNC_STATE_LEARNER) {
148,085,883✔
722
    mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64
694,218✔
723
          " , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d",
724
          vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex,
725
          pVload->syncTotalIndex, pVload->learnerProgress, pVload->snapSeq);
726
  }
727

728
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
148,085,883✔
729
      pGid->startTimeMs != pVload->startTimeMs) {
142,254,654✔
730
    mInfo(
5,831,229✔
731
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
732
        "canRead:%d, dnode:%d",
733
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
734
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
735
    pGid->syncState = pVload->syncState;
5,831,229✔
736
    pGid->syncTerm = pVload->syncTerm;
5,831,229✔
737
    pGid->syncRestore = pVload->syncRestore;
5,831,229✔
738
    pGid->syncCanRead = pVload->syncCanRead;
5,831,229✔
739
    pGid->startTimeMs = pVload->startTimeMs;
5,831,229✔
740
    pGid->roleTimeMs = pVload->roleTimeMs;
5,831,229✔
741
    stateChanged = true;
5,831,229✔
742
  }
743
  return stateChanged;
148,085,883✔
744
}
745

746
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
32,319,825✔
747
  bool stateChanged = false;
32,319,825✔
748
  bool roleChanged = pObj->syncState != pMload->syncState ||
32,331,169✔
749
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
64,164,196✔
750
                     pObj->roleTimeMs != pMload->roleTimeMs;
31,844,371✔
751
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
32,319,825✔
752
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
483,148✔
753
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
754
          pObj->syncTerm, pMload->syncTerm);
755
    pObj->syncState = pMload->syncState;
483,148✔
756
    pObj->syncTerm = pMload->syncTerm;
483,148✔
757
    pObj->syncRestore = pMload->syncRestore;
483,148✔
758
    pObj->roleTimeMs = pMload->roleTimeMs;
483,148✔
759
    stateChanged = true;
483,148✔
760
  }
761
  return stateChanged;
32,319,825✔
762
}
763

764
extern char   *tsMonFwUri;
765
extern char   *tsMonSlowLogUri;
766
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
329✔
767
  SMnode    *pMnode = pReq->info.node;
329✔
768
  SStatisReq statisReq = {0};
329✔
769
  int32_t    code = -1;
329✔
770

771
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
329✔
772

773
  if (tsMonitorLogProtocol) {
329✔
774
    mInfo("process statis req,\n %s", statisReq.pCont);
55✔
775
  }
776

777
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
329✔
778
    monSendContent(statisReq.pCont, tsMonFwUri);
329✔
779
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
780
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
781
  }
782

783
  tFreeSStatisReq(&statisReq);
329✔
784
  return 0;
329✔
785
}
786

787
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
1,875✔
788
  mTrace("process audit req:%p", pReq);
1,875✔
789
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
1,875✔
790
    SMnode   *pMnode = pReq->info.node;
1,875✔
791
    SAuditReq auditReq = {0};
1,875✔
792

793
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
1,875✔
794

795
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
1,875✔
796

797
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
1,875✔
798
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
799

800
    tFreeSAuditReq(&auditReq);
1,875✔
801
  }
802
  return 0;
1,875✔
803
}
804

805
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
806
  mTrace("process audit req:%p", pReq);
×
807
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
808
    SMnode        *pMnode = pReq->info.node;
×
809
    SBatchAuditReq auditReq = {0};
×
810

811
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
812

813
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
814

815
    for (int32_t i = 0; i < nAudit; ++i) {
×
816
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
817
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
818

819
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
820
                     audit->duration, audit->affectedRows);
821
    }
822

823
    tFreeSBatchAuditReq(&auditReq);
×
824
  }
825
  return 0;
×
826
}
827

828
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
733,510✔
829
  int32_t       code = 0, lino = 0;
733,510✔
830
  SDnodeInfoReq infoReq = {0};
733,510✔
831
  int32_t       contLen = 0;
733,510✔
832
  void         *pReq = NULL;
733,510✔
833

834
  infoReq.dnodeId = pDnode->id;
733,510✔
835
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
733,510✔
836

837
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
733,510✔
838
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
839
  }
840
  pReq = rpcMallocCont(contLen);
733,510✔
841
  if (pReq == NULL) {
733,510✔
842
    TAOS_RETURN(terrno);
×
843
  }
844

845
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
733,510✔
846
    code = contLen;
×
847
    goto _exit;
×
848
  }
849

850
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
733,510✔
851
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
733,510✔
852
_exit:
733,510✔
853
  if (code < 0) {
733,510✔
854
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
855
  }
856
  TAOS_RETURN(code);
733,510✔
857
}
858

859
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
733,415✔
860
  int32_t       code = 0, lino = 0;
733,415✔
861
  SMnode       *pMnode = pReq->info.node;
733,415✔
862
  SDnodeInfoReq infoReq = {0};
733,415✔
863
  SDnodeObj    *pDnode = NULL;
733,415✔
864
  STrans       *pTrans = NULL;
733,415✔
865
  SSdbRaw      *pCommitRaw = NULL;
733,415✔
866

867
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
733,415✔
868

869
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
733,415✔
870
  if (pDnode == NULL) {
733,415✔
871
    TAOS_CHECK_EXIT(terrno);
284✔
872
  }
873

874
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
733,131✔
875
  if (pTrans == NULL) {
733,131✔
876
    TAOS_CHECK_EXIT(terrno);
×
877
  }
878

879
  pDnode->updateTime = taosGetTimestampMs();
733,131✔
880

881
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
733,131✔
882
    TAOS_CHECK_EXIT(terrno);
×
883
  }
884
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
733,131✔
885
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
886
    TAOS_CHECK_EXIT(code);
×
887
  }
888
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
733,131✔
889
  pCommitRaw = NULL;
733,131✔
890

891
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
733,131✔
892
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
893
    TAOS_CHECK_EXIT(code);
×
894
  }
895

896
_exit:
733,415✔
897
  mndReleaseDnode(pMnode, pDnode);
733,415✔
898
  if (code != 0) {
733,415✔
899
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
284✔
900
  }
901
  mndTransDrop(pTrans);
733,415✔
902
  sdbFreeRaw(pCommitRaw);
733,415✔
903
  TAOS_RETURN(code);
733,415✔
904
}
905

906
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
50,819,537✔
907
  SMnode    *pMnode = pReq->info.node;
50,819,537✔
908
  SStatusReq statusReq = {0};
50,819,537✔
909
  SDnodeObj *pDnode = NULL;
50,819,537✔
910
  int32_t    code = -1;
50,819,537✔
911
  int32_t    lino = 0;
50,819,537✔
912

913
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), &lino, _OVER);
50,819,537✔
914

915
  int64_t clusterid = mndGetClusterId(pMnode);
50,819,537✔
916
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
50,819,537✔
917
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
918
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
919
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
920
    TAOS_CHECK_GOTO(code, &lino, _OVER);
×
921
  }
922

923
  if (statusReq.dnodeId == 0) {
50,819,537✔
924
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
1,122,274✔
925
    if (pDnode == NULL) {
1,122,274✔
926
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
467,917✔
927
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
467,917✔
928
      if (terrno != 0) code = terrno;
467,917✔
929
      TAOS_CHECK_GOTO(code, &lino, _OVER);
467,917✔
930
    }
931
  } else {
932
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
49,697,263✔
933
    if (pDnode == NULL) {
49,697,263✔
934
      int32_t err = terrno;
167,926✔
935
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
167,926✔
936
      if (pDnode != NULL) {
167,926✔
937
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
80✔
938
        code = err;
80✔
939
        TAOS_CHECK_GOTO(code, &lino, _OVER);
80✔
940
      }
941

942
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
167,846✔
943
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
167,846✔
944
        code = err;
7,548✔
945
        TAOS_CHECK_GOTO(code, &lino, _OVER);
7,548✔
946
      } else {
947
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
160,298✔
948
        if (pDnode == NULL) goto _OVER;
160,298✔
949
      }
950
    }
951
  }
952

953
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
50,343,992✔
954
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
50,343,992✔
955

956
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
50,343,992✔
957
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
50,343,992✔
958
  int64_t curMs = taosGetTimestampMs();
50,343,992✔
959
  bool    online = mndIsDnodeOnline(pDnode, curMs);
50,343,992✔
960
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
50,343,992✔
961
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
50,343,992✔
962
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
50,343,992✔
963
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
50,343,992✔
964
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
50,343,992✔
965
  bool    analVerChanged = (analVer != statusReq.analVer);
50,343,992✔
966
  bool    auditDBChanged = false;
50,343,992✔
967
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
50,343,992✔
968
  bool    auditInfoChanged = false;
50,343,992✔
969
  char    auditToken[TSDB_TOKEN_LEN] = {0};
50,343,992✔
970

971
  SDbObj *pDb = NULL;
50,343,992✔
972
  if (tsAuditUseToken || tsAuditSaveInSelf) {
50,343,992✔
973
    pDb = mndAcquireAuditDb(pMnode);
50,342,437✔
974
  }
975
  if (tsAuditUseToken) {
50,343,992✔
976
    if (pDb != NULL) {
50,340,457✔
977
      SName name = {0};
1,104✔
978
      if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0)
1,104✔
979
        mError("db:%s, failed to parse db name", pDb->name);
×
980
      tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN);
1,104✔
981
    }
982
    if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
50,340,457✔
983

984
    char    auditUser[TSDB_USER_LEN] = {0};
50,340,457✔
985
    int32_t ret = 0;
50,340,457✔
986
    if ((ret = mndGetAuditUser(pMnode, auditUser)) != 0) {
50,340,457✔
987
      mTrace("dnode:%d, failed to get audit user since %s", pDnode->id, tstrerror(ret));
50,339,422✔
988
    } else {
989
      mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser);
1,035✔
990
      int32_t ret = 0;
1,035✔
991
      if ((ret = mndGetUserActiveToken(auditUser, auditToken)) != 0) {
1,035✔
992
        mTrace("dnode:%d, failed to get audit user active token, token:xxxx, since %s", pDnode->id, tstrerror(ret));
×
993
      } else {
994
        mTrace("dnode:%d, get audit user active token:xxxx", pDnode->id);
1,035✔
995
        if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditInfoChanged = true;
1,035✔
996
      }
997
    }
998
  }
999

1000
  SEpSet  auditVnodeEpSet = {0};
50,343,992✔
1001
  int32_t auditVgId = 0;
50,343,992✔
1002
  if (tsAuditSaveInSelf) {
50,343,992✔
1003
    if (pDb != NULL) {
1,980✔
1004
      void   *pIter = NULL;
1,760✔
1005
      SVgObj *pVgroup = NULL;
1,760✔
1006
      while (1) {
1007
        pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
5,280✔
1008
        if (pIter == NULL) break;
5,280✔
1009

1010
        if (mndVgroupInDb(pVgroup, pDb->uid)) {
5,170✔
1011
          auditVnodeEpSet = mndGetVgroupEpset(pMnode, pVgroup);
1,650✔
1012
          auditVgId = pVgroup->vgId;
1,650✔
1013
          sdbCancelFetch(pMnode->pSdb, pIter);
1,650✔
1014
          sdbRelease(pMnode->pSdb, pVgroup);
1,650✔
1015
          break;
1,650✔
1016
        }
1017
        sdbRelease(pMnode->pSdb, pVgroup);
3,520✔
1018
      }
1019
    }
1020

1021
    if (auditVnodeEpSet.numOfEps != statusReq.auditEpSet.numOfEps) {
1,980✔
1022
      auditInfoChanged = true;
110✔
1023
      mTrace("dnode:%d, audit epset num changed, auditNum:%d, inReq:%d", pDnode->id, auditVnodeEpSet.numOfEps,
110✔
1024
             statusReq.auditEpSet.numOfEps);
1025
    } else {
1026
      for (int32_t i = 0; i < auditVnodeEpSet.numOfEps; i++) {
3,410✔
1027
        if (strncmp(auditVnodeEpSet.eps[i].fqdn, statusReq.auditEpSet.eps[i].fqdn, TSDB_FQDN_LEN) != 0 ||
1,540✔
1028
            auditVnodeEpSet.eps[i].port != statusReq.auditEpSet.eps[i].port) {
1,540✔
1029
          // do not need to check InUse here, because inUse is not accurate at every time
1030
          auditInfoChanged = true;
×
1031
          mTrace("dnode:%d, audit epset changed at item:%d, fqdn:%s:%d:, inReq:%s:%d", pDnode->id, i,
×
1032
                 auditVnodeEpSet.eps[i].fqdn, auditVnodeEpSet.eps[i].port, statusReq.auditEpSet.eps[i].fqdn,
1033
                 statusReq.auditEpSet.eps[i].port);
1034
          break;
×
1035
        }
1036
      }
1037
    }
1038

1039
    if (auditVgId != statusReq.auditVgId) {
1,980✔
1040
      auditInfoChanged = true;
110✔
1041
      mTrace("dnode:%d, audit vgId changed, auditVgId:%d, inReq:%d", pDnode->id, auditVgId, statusReq.auditVgId);
110✔
1042
    }
1043
  }
1044

1045
  if (pDb != NULL) {
50,343,992✔
1046
    mndReleaseDb(pMnode, pDb);
2,864✔
1047
  }
1048

1049
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
49,649,665✔
1050
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
48,167,719✔
1051
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditInfoChanged;
99,993,657✔
1052
  const STraceId *trace = &pReq->info.traceId;
50,343,992✔
1053
  char            timestamp[TD_TIME_STR_LEN] = {0};
50,343,992✔
1054
  if (mDebugFlag & DEBUG_TRACE)
50,343,992✔
1055
    (void)formatTimestampLocal(timestamp, sizeof(timestamp), statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
178,412✔
1056
  mGTrace(
50,343,992✔
1057
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
1058
      "timestamp:%s",
1059
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
1060

1061
  if (reboot) {
50,343,992✔
1062
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
713,320✔
1063
  }
1064

1065
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
50,343,992✔
1066
  if (labs(delta) >= tsTimestampDeltaLimit) {
50,343,992✔
1067
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
1068
    code = terrno;
×
1069

1070
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
1071
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
1072
           tstrerror(code), tsTimestampDeltaLimit);
1073
    TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1074
  }
1075
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
199,013,321✔
1076
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
148,669,329✔
1077

1078
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
148,669,329✔
1079
    if (pVgroup != NULL) {
148,669,329✔
1080
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
148,147,810✔
1081
        pVgroup->cacheUsage = pVload->cacheUsage;
119,053,968✔
1082
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
119,053,968✔
1083
        pVgroup->numOfTables = pVload->numOfTables;
119,053,968✔
1084
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
119,053,968✔
1085
        pVgroup->totalStorage = pVload->totalStorage;
119,053,968✔
1086
        pVgroup->compStorage = pVload->compStorage;
119,053,968✔
1087
        pVgroup->pointsWritten = pVload->pointsWritten;
119,053,968✔
1088
      }
1089
      bool stateChanged = false;
148,147,810✔
1090
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
188,671,947✔
1091
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
188,610,020✔
1092
        if (pGid->dnodeId == statusReq.dnodeId) {
188,610,020✔
1093
          if (pVload->startTimeMs == 0) {
148,085,883✔
1094
            pVload->startTimeMs = statusReq.rebootTime;
×
1095
          }
1096
          if (pVload->roleTimeMs == 0) {
148,085,883✔
1097
            pVload->roleTimeMs = statusReq.rebootTime;
×
1098
          }
1099
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
148,085,883✔
1100
          break;
148,085,883✔
1101
        }
1102
      }
1103
      if (stateChanged) {
148,147,810✔
1104
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,831,229✔
1105
        if (pDb != NULL && pDb->stateTs != curMs) {
5,831,229✔
1106
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
4,066,459✔
1107
                pDb->stateTs, curMs);
1108
          pDb->stateTs = curMs;
4,066,459✔
1109
        }
1110
        mndReleaseDb(pMnode, pDb);
5,831,229✔
1111
      }
1112
    }
1113

1114
    mndReleaseVgroup(pMnode, pVgroup);
148,669,329✔
1115
  }
1116

1117
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
50,343,992✔
1118
  if (pObj != NULL) {
50,343,992✔
1119
    if (statusReq.mload.roleTimeMs == 0) {
32,319,825✔
1120
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
257,867✔
1121
    }
1122
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
32,319,825✔
1123
    mndReleaseMnode(pMnode, pObj);
32,319,825✔
1124
  }
1125

1126
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
50,343,992✔
1127
  if (pQnode != NULL) {
50,343,992✔
1128
    pQnode->load = statusReq.qload;
314,591✔
1129
    mndReleaseQnode(pMnode, pQnode);
314,591✔
1130
  }
1131

1132
  if (needCheck) {
50,343,992✔
1133
    if (statusReq.sver != tsVersion) {
2,176,751✔
1134
      if (pDnode != NULL) {
×
1135
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
1136
      }
1137
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
1138
      code = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
1139
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1140
    }
1141

1142
    if (statusReq.dnodeId == 0) {
2,176,751✔
1143
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
654,357✔
1144
    } else {
1145
      if (statusReq.clusterId != pMnode->clusterId) {
1,522,394✔
1146
        if (pDnode != NULL) {
×
1147
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
1148
        }
1149
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
1150
               pMnode->clusterId);
1151
        code = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
1152
        TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1153
      }
1154
    }
1155

1156
    // Verify whether the cluster parameters are consistent when status change from offline to ready
1157
    // pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
1158
    // if (pDnode->offlineReason != 0) {
1159
    //   mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
1160
    //   if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
1161
    //   goto _OVER;
1162
    // }
1163

1164
    if (!online) {
2,176,751✔
1165
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
694,327✔
1166
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
1167
    } else {
1168
      mInfo("dnode:%d, do check in status req, online:%d dnodeVer:%" PRId64 ":%" PRId64
1,482,424✔
1169
            " reboot:%d, dnodeChanged:%d supportVnodesChanged:%d analVerChanged:%d encryptKeyChanged:%d "
1170
            "enableWhiteListChanged:%d auditDBChanged:%d auditInfoChanged:%d pMnode->ipWhiteVer:%" PRId64
1171
            " statusReq.ipWhiteVer:%" PRId64 " pMnode->timeWhiteVer:%" PRId64 " statusReq.timeWhiteVer:%" PRId64,
1172
            pDnode->id, online, statusReq.dnodeVer, dnodeVer, reboot, dnodeChanged, supportVnodesChanged,
1173
            analVerChanged, encryptKeyChanged, enableWhiteListChanged, auditDBChanged, auditInfoChanged,
1174
            pMnode->ipWhiteVer, statusReq.ipWhiteVer, pMnode->timeWhiteVer, statusReq.timeWhiteVer);
1175
    }
1176

1177
    pDnode->rebootTime = statusReq.rebootTime;
2,176,751✔
1178
    pDnode->numOfCores = statusReq.numOfCores;
2,176,751✔
1179
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,176,751✔
1180
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,176,751✔
1181
    pDnode->memAvail = statusReq.memAvail;
2,176,751✔
1182
    pDnode->memTotal = statusReq.memTotal;
2,176,751✔
1183
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,176,751✔
1184
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,176,751✔
1185
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,176,751✔
1186
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
733,510✔
1187
      if ((code = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
733,510✔
1188
        TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1189
      }
1190
    }
1191

1192
    SStatusRsp statusRsp = {0};
2,176,751✔
1193
    statusRsp.statusSeq++;
2,176,751✔
1194
    statusRsp.analVer = analVer;
2,176,751✔
1195
    statusRsp.dnodeVer = dnodeVer;
2,176,751✔
1196
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,176,751✔
1197
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,176,751✔
1198
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,176,751✔
1199
    if (statusRsp.pDnodeEps == NULL) {
2,176,751✔
1200
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1201
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1202
    }
1203

1204
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,176,751✔
1205
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,176,751✔
1206
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,176,751✔
1207

1208
    if (auditInfoChanged || auditDBChanged) {
2,176,751✔
1209
      if (tsAuditUseToken) {
248✔
1210
        if (auditDB[0] != '\0') {
138✔
1211
          mInfo("dnode:%d, set audit db:%s in process status rsp", statusReq.dnodeId, auditDB);
138✔
1212
          tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
138✔
1213
        }
1214
        if (auditToken[0] != '\0') {
138✔
1215
          mInfo("dnode:%d, set audit token:xxxx in process status rsp", statusReq.dnodeId);
69✔
1216
          tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN);
69✔
1217
        }
1218
      }
1219

1220
      if (tsAuditSaveInSelf) {
248✔
1221
        mInfo("dnode:%d, set audit epset and vgId:%d in process status rsp", statusReq.dnodeId, auditVgId);
110✔
1222
        statusRsp.auditEpSet = auditVnodeEpSet;
110✔
1223
        statusRsp.auditVgId = auditVgId;
110✔
1224
      }
1225
    }
1226

1227
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,176,751✔
1228
    void   *pHead = rpcMallocCont(contLen);
2,176,751✔
1229
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,176,751✔
1230
    taosArrayDestroy(statusRsp.pDnodeEps);
2,176,751✔
1231
    if (contLen < 0) {
2,176,751✔
1232
      code = contLen;
×
1233
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1234
    }
1235

1236
    pReq->info.rspLen = contLen;
2,176,751✔
1237
    pReq->info.rsp = pHead;
2,176,751✔
1238
  }
1239

1240
  pDnode->accessTimes++;
50,343,992✔
1241
  pDnode->lastAccessTime = curMs;
50,343,992✔
1242
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
50,343,992✔
1243
    pDnode->offlineReason = DND_REASON_ONLINE;
694,327✔
1244
  }
1245
  code = 0;
50,343,992✔
1246

1247
_OVER:
50,819,537✔
1248
  mndReleaseDnode(pMnode, pDnode);
50,819,537✔
1249
  taosArrayDestroy(statusReq.pVloads);
50,819,537✔
1250
  if (code != 0) {
50,819,537✔
1251
    mError("dnode:%d, failed to process status req at line:%d since %s", statusReq.dnodeId, lino, tstrerror(code));
475,545✔
1252
    return code;
475,545✔
1253
  }
1254

1255
  return mndUpdClusterInfo(pReq);
50,343,992✔
1256
}
1257

1258
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
1259
  SMnode    *pMnode = pReq->info.node;
×
1260
  SNotifyReq notifyReq = {0};
×
1261
  int32_t    code = 0;
×
1262

1263
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
1264
    terrno = code;
×
1265
    goto _OVER;
×
1266
  }
1267

1268
  int64_t clusterid = mndGetClusterId(pMnode);
×
1269
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
1270
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
1271
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1272
          notifyReq.clusterId, clusterid, tstrerror(code));
1273
    goto _OVER;
×
1274
  }
1275

1276
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
1277
  for (int32_t v = 0; v < nVgroup; ++v) {
×
1278
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1279

1280
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
1281
    if (pVgroup != NULL) {
×
1282
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
1283
      mndReleaseVgroup(pMnode, pVgroup);
×
1284
    }
1285
  }
1286
  code = mndUpdClusterInfo(pReq);
×
1287
_OVER:
×
1288
  tFreeSNotifyReq(&notifyReq);
×
1289
  return code;
×
1290
}
1291

1292
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
163,574✔
1293
  int32_t  code = -1;
163,574✔
1294
  SSdbRaw *pRaw = NULL;
163,574✔
1295
  STrans  *pTrans = NULL;
163,574✔
1296

1297
  SDnodeObj dnodeObj = {0};
163,574✔
1298
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
163,574✔
1299
  dnodeObj.createdTime = taosGetTimestampMs();
163,574✔
1300
  dnodeObj.updateTime = dnodeObj.createdTime;
163,574✔
1301
  dnodeObj.port = pCreate->port;
163,574✔
1302
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
163,574✔
1303
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
163,574✔
1304

1305
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
163,574✔
1306
  if (pTrans == NULL) {
163,574✔
1307
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1308
    if (terrno != 0) code = terrno;
×
1309
    goto _OVER;
×
1310
  }
1311
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
163,574✔
1312
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
163,574✔
1313

1314
  pRaw = mndDnodeActionEncode(&dnodeObj);
163,574✔
1315
  if (pRaw == NULL) {
163,574✔
1316
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1317
    if (terrno != 0) code = terrno;
×
1318
    goto _OVER;
×
1319
  }
1320
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
163,574✔
1321
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
163,574✔
1322
  pRaw = NULL;
163,574✔
1323

1324
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
163,574✔
1325
  code = 0;
163,574✔
1326

1327
_OVER:
163,574✔
1328
  mndTransDrop(pTrans);
163,574✔
1329
  sdbFreeRaw(pRaw);
163,574✔
1330
  return code;
163,574✔
1331
}
1332

1333
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
28,837✔
1334
  SMnode       *pMnode = pReq->info.node;
28,837✔
1335
  SSdb         *pSdb = pMnode->pSdb;
28,837✔
1336
  SDnodeObj    *pObj = NULL;
28,837✔
1337
  void         *pIter = NULL;
28,837✔
1338
  SDnodeListRsp rsp = {0};
28,837✔
1339
  int32_t       code = -1;
28,837✔
1340

1341
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
28,837✔
1342
  if (NULL == rsp.dnodeList) {
28,837✔
1343
    mError("failed to alloc epSet while process dnode list req");
×
1344
    code = terrno;
×
1345
    goto _OVER;
×
1346
  }
1347

1348
  while (1) {
54,774✔
1349
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
83,611✔
1350
    if (pIter == NULL) break;
83,611✔
1351

1352
    SDNodeAddr dnodeAddr = {0};
54,774✔
1353
    dnodeAddr.nodeId = pObj->id;
54,774✔
1354
    dnodeAddr.epSet.numOfEps = 1;
54,774✔
1355
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
54,774✔
1356
    dnodeAddr.epSet.eps[0].port = pObj->port;
54,774✔
1357

1358
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
109,548✔
1359
      if (terrno != 0) code = terrno;
×
1360
      sdbRelease(pSdb, pObj);
×
1361
      sdbCancelFetch(pSdb, pIter);
×
1362
      goto _OVER;
×
1363
    }
1364

1365
    sdbRelease(pSdb, pObj);
54,774✔
1366
  }
1367

1368
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
28,837✔
1369
  void   *pRsp = rpcMallocCont(rspLen);
28,837✔
1370
  if (pRsp == NULL) {
28,837✔
1371
    code = terrno;
×
1372
    goto _OVER;
×
1373
  }
1374

1375
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
28,837✔
1376
    code = rspLen;
×
1377
    goto _OVER;
×
1378
  }
1379

1380
  pReq->info.rspLen = rspLen;
28,837✔
1381
  pReq->info.rsp = pRsp;
28,837✔
1382
  code = 0;
28,837✔
1383

1384
_OVER:
28,837✔
1385

1386
  if (code != 0) {
28,837✔
1387
    mError("failed to get dnode list since %s", tstrerror(code));
×
1388
  }
1389

1390
  tFreeSDnodeListRsp(&rsp);
28,837✔
1391

1392
  TAOS_RETURN(code);
28,837✔
1393
}
1394

1395
void getSlowLogScopeString(int32_t scope, char *result) {
1,204✔
1396
  if (scope == SLOW_LOG_TYPE_NULL) {
1,204✔
1397
    (void)strncat(result, "NONE", 64);
×
1398
    return;
×
1399
  }
1400
  while (scope > 0) {
2,408✔
1401
    if (scope & SLOW_LOG_TYPE_QUERY) {
1,204✔
1402
      (void)strncat(result, "QUERY", 64);
1,204✔
1403
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,204✔
1404
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1405
      (void)strncat(result, "INSERT", 64);
×
1406
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1407
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1408
      (void)strncat(result, "OTHERS", 64);
×
1409
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1410
    } else {
1411
      (void)printf("invalid slow log scope:%d", scope);
×
1412
      return;
×
1413
    }
1414

1415
    if (scope > 0) {
1,204✔
1416
      (void)strncat(result, "|", 64);
×
1417
    }
1418
  }
1419
}
1420

1421
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
163,574✔
1422
  SMnode         *pMnode = pReq->info.node;
163,574✔
1423
  int32_t         code = -1;
163,574✔
1424
  SDnodeObj      *pDnode = NULL;
163,574✔
1425
  SCreateDnodeReq createReq = {0};
163,574✔
1426
  int32_t         lino = 0;
163,574✔
1427
  int64_t         tss = taosGetTimestampMs();
163,574✔
1428

1429
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
163,574✔
1430
    goto _OVER;
×
1431
  }
1432

1433
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
163,574✔
1434
  TAOS_CHECK_GOTO(code, &lino, _OVER);
163,574✔
1435

1436
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
163,574✔
1437
  code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_DNODE);
163,574✔
1438
  TAOS_CHECK_GOTO(code, &lino, _OVER);
163,574✔
1439

1440
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
163,574✔
1441
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1442
    goto _OVER;
×
1443
  }
1444
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1445
  // if (code != 0) {
1446
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1447
  //          tstrerror(code));
1448
  //   goto _OVER;
1449
  // }
1450

1451
  char ep[TSDB_EP_LEN];
163,574✔
1452
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
163,574✔
1453
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
163,574✔
1454
  if (pDnode != NULL) {
163,574✔
1455
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1456
    goto _OVER;
×
1457
  }
1458

1459
  code = mndCreateDnode(pMnode, pReq, &createReq);
163,574✔
1460
  if (code == 0) {
163,574✔
1461
    code = TSDB_CODE_ACTION_IN_PROGRESS;
163,574✔
1462
    tsGrantHBInterval = 5;
163,574✔
1463
  }
1464

1465
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
163,574✔
1466
    char obj[200] = {0};
163,574✔
1467
    (void)snprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
163,574✔
1468

1469
    int64_t tse = taosGetTimestampMs();
163,574✔
1470
    double  duration = (double)(tse - tss);
163,574✔
1471
    duration = duration / 1000;
163,574✔
1472
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
163,574✔
1473
  }
1474

1475
_OVER:
163,574✔
1476
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
163,574✔
1477
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1478
  }
1479

1480
  mndReleaseDnode(pMnode, pDnode);
163,574✔
1481
  tFreeSCreateDnodeReq(&createReq);
163,574✔
1482
  TAOS_RETURN(code);
163,574✔
1483
}
1484

1485
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1486

1487
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
2,375✔
1488

1489
#ifndef TD_ENTERPRISE
1490
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1491
#endif
1492

1493
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
10,150✔
1494
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1495
  int32_t  code = -1;
10,150✔
1496
  SSdbRaw *pRaw = NULL;
10,150✔
1497
  STrans  *pTrans = NULL;
10,150✔
1498
  int32_t  lino = 0;
10,150✔
1499

1500
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
10,150✔
1501
  if (pTrans == NULL) {
10,150✔
1502
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1503
    if (terrno != 0) code = terrno;
×
1504
    goto _OVER;
×
1505
  }
1506
  mndTransSetGroupParallel(pTrans);
10,150✔
1507
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
10,150✔
1508
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
10,150✔
1509

1510
  pRaw = mndDnodeActionEncode(pDnode);
10,150✔
1511
  if (pRaw == NULL) {
10,150✔
1512
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1513
    if (terrno != 0) code = terrno;
×
1514
    goto _OVER;
×
1515
  }
1516
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
10,150✔
1517
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
10,150✔
1518
  pRaw = NULL;
10,150✔
1519

1520
  pRaw = mndDnodeActionEncode(pDnode);
10,150✔
1521
  if (pRaw == NULL) {
10,150✔
1522
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1523
    if (terrno != 0) code = terrno;
×
1524
    goto _OVER;
×
1525
  }
1526
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
10,150✔
1527
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
10,150✔
1528
  pRaw = NULL;
10,150✔
1529

1530
  if (pSObj != NULL) {
10,150✔
1531
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
621✔
1532
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
621✔
1533
  }
1534

1535
  if (pMObj != NULL) {
10,150✔
1536
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
240✔
1537
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
240✔
1538
  }
1539

1540
  if (pQObj != NULL) {
10,150✔
1541
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
159✔
1542
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
159✔
1543
  }
1544

1545
  if (pBObj != NULL) {
10,150✔
1546
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
854✔
1547
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
854✔
1548
  }
1549

1550
  if (numOfVnodes > 0) {
9,296✔
1551
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
7,705✔
1552
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
7,705✔
1553
  }
1554

1555
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
9,296✔
1556

1557
  code = 0;
9,296✔
1558

1559
_OVER:
10,150✔
1560
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
10,150✔
1561
  mndTransDrop(pTrans);
10,150✔
1562
  sdbFreeRaw(pRaw);
10,150✔
1563
  TAOS_RETURN(code);
10,150✔
1564
}
1565

1566
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1567
  bool       isEmpty = false;
×
1568
  SMnodeObj *pMObj = NULL;
×
1569
  SQnodeObj *pQObj = NULL;
×
1570
  SSnodeObj *pSObj = NULL;
×
1571

1572
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1573
  if (pQObj) goto _OVER;
×
1574

1575
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1576
  if (pSObj) goto _OVER;
×
1577

1578
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1579
  if (pMObj) goto _OVER;
×
1580

1581
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1582
  if (numOfVnodes > 0) goto _OVER;
×
1583

1584
  isEmpty = true;
×
1585
_OVER:
×
1586
  mndReleaseMnode(pMnode, pMObj);
×
1587
  mndReleaseQnode(pMnode, pQObj);
×
1588
  mndReleaseSnode(pMnode, pSObj);
×
1589
  return isEmpty;
×
1590
}
1591

1592
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
10,829✔
1593
  SMnode       *pMnode = pReq->info.node;
10,829✔
1594
  int32_t       code = -1;
10,829✔
1595
  SDnodeObj    *pDnode = NULL;
10,829✔
1596
  SMnodeObj    *pMObj = NULL;
10,829✔
1597
  SQnodeObj    *pQObj = NULL;
10,829✔
1598
  SSnodeObj    *pSObj = NULL;
10,829✔
1599
  SBnodeObj    *pBObj = NULL;
10,829✔
1600
  SDropDnodeReq dropReq = {0};
10,829✔
1601
  int64_t       tss = taosGetTimestampMs();
10,829✔
1602

1603
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
10,829✔
1604

1605
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
10,829✔
1606
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1607
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MNODE), NULL, _OVER);
10,829✔
1608

1609
  bool force = dropReq.force;
10,829✔
1610
  if (dropReq.unsafe) {
10,829✔
1611
    force = true;
×
1612
  }
1613

1614
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
10,829✔
1615
  if (pDnode == NULL) {
10,829✔
1616
    int32_t err = terrno;
×
1617
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1618
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1619
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1620
    if (pDnode == NULL) {
×
1621
      code = err;
×
1622
      goto _OVER;
×
1623
    }
1624
  }
1625

1626
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
10,829✔
1627
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
10,829✔
1628
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
10,829✔
1629
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
10,829✔
1630
  if (pMObj != NULL) {
10,829✔
1631
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
919✔
1632
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
439✔
1633
      goto _OVER;
439✔
1634
    }
1635
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
480✔
1636
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
240✔
1637
      goto _OVER;
240✔
1638
    }
1639
  }
1640

1641
#ifdef USE_MOUNT
1642
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
10,150✔
1643
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1644
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1645
    goto _OVER;
×
1646
  }
1647
#endif
1648

1649
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
10,150✔
1650
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
10,150✔
1651

1652
  if (isonline && force) {
10,150✔
1653
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1654
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1655
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1656
    goto _OVER;
×
1657
  }
1658

1659
  mError("vnode num:%d", numOfVnodes);
10,150✔
1660

1661
  bool    vnodeOffline = false;
10,150✔
1662
  void   *pIter = NULL;
10,150✔
1663
  int32_t vgId = -1;
10,150✔
1664
  while (1) {
22,942✔
1665
    SVgObj *pVgroup = NULL;
33,092✔
1666
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
33,092✔
1667
    if (pIter == NULL) break;
33,092✔
1668

1669
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
69,336✔
1670
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
46,394✔
1671
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
46,394✔
1672
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
15,392✔
1673
          vgId = pVgroup->vgId;
×
1674
          vnodeOffline = true;
×
1675
          break;
×
1676
        }
1677
      }
1678
    }
1679

1680
    sdbRelease(pMnode->pSdb, pVgroup);
22,942✔
1681

1682
    if (vnodeOffline) {
22,942✔
1683
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1684
      break;
×
1685
    }
1686
  }
1687

1688
  if (vnodeOffline && !force) {
10,150✔
1689
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1690
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1691
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1692
    goto _OVER;
×
1693
  }
1694

1695
  if (!isonline && !force) {
10,150✔
1696
    code = TSDB_CODE_DNODE_OFFLINE;
×
1697
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
×
1698
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1699
    goto _OVER;
×
1700
  }
1701

1702
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
10,150✔
1703
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
10,150✔
1704

1705
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
10,150✔
1706
    char obj1[30] = {0};
10,150✔
1707
    (void)snprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
10,150✔
1708

1709
    int64_t tse = taosGetTimestampMs();
10,150✔
1710
    double  duration = (double)(tse - tss);
10,150✔
1711
    duration = duration / 1000;
10,150✔
1712
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
10,150✔
1713
  }
1714

1715
_OVER:
10,829✔
1716
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10,829✔
1717
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
1,533✔
1718
  }
1719

1720
  mndReleaseDnode(pMnode, pDnode);
10,829✔
1721
  mndReleaseMnode(pMnode, pMObj);
10,829✔
1722
  mndReleaseQnode(pMnode, pQObj);
10,829✔
1723
  mndReleaseBnode(pMnode, pBObj);
10,829✔
1724
  mndReleaseSnode(pMnode, pSObj);
10,829✔
1725
  tFreeSDropDnodeReq(&dropReq);
10,829✔
1726
  TAOS_RETURN(code);
10,829✔
1727
}
1728

1729
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
181✔
1730
  int32_t code = 0;
181✔
1731
  SMnode *pMnode = pReq->info.node;
181✔
1732
  SSdb   *pSdb = pMnode->pSdb;
181✔
1733
  void   *pIter = NULL;
181✔
1734
  int8_t  encrypting = 0;
181✔
1735

1736
  const STraceId *trace = &pReq->info.traceId;
181✔
1737

1738
  int32_t klen = strlen(pDcfgReq->value);
181✔
1739
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
181✔
1740
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1741
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1742
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1743
    goto _exit;
×
1744
  }
1745

1746
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
181✔
1747
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1748
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1749
    goto _exit;
×
1750
  }
1751

1752
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
181✔
1753
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1754
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1755
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1756
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1757
    goto _exit;
×
1758
  }
1759

1760
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
181✔
1761
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
181✔
1762
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
181✔
1763

1764
  while (1) {
181✔
1765
    SDnodeObj *pDnode = NULL;
362✔
1766
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
362✔
1767
    if (pIter == NULL) break;
362✔
1768
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
181✔
1769
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1770
             offlineReason[pDnode->offlineReason]);
1771
      sdbRelease(pSdb, pDnode);
×
1772
      continue;
×
1773
    }
1774

1775
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
181✔
1776
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
181✔
1777
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
181✔
1778
      void   *pBuf = rpcMallocCont(bufLen);
181✔
1779

1780
      if (pBuf != NULL) {
181✔
1781
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
181✔
1782
          code = bufLen;
×
1783
          sdbRelease(pSdb, pDnode);
×
1784
          goto _exit;
×
1785
        }
1786
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
181✔
1787
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
181✔
1788
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
181✔
1789
        }
1790
      }
1791
    }
1792

1793
    sdbRelease(pSdb, pDnode);
181✔
1794
  }
1795

1796
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
181✔
1797
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1798
  }
1799

1800
_exit:
181✔
1801
  if (code != 0) {
181✔
1802
    if (terrno == 0) terrno = code;
×
1803
  }
1804
  return code;
181✔
1805
}
1806

1807
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
181✔
1808
  int32_t code = 0;
181✔
1809

1810
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1811
  SMnode       *pMnode = pReq->info.node;
181✔
1812
  SMCfgDnodeReq cfgReq = {0};
181✔
1813
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
181✔
1814

1815
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE)) != 0) {
181✔
1816
    tFreeSMCfgDnodeReq(&cfgReq);
×
1817
    TAOS_RETURN(code);
×
1818
  }
1819
  const STraceId *trace = &pReq->info.traceId;
181✔
1820
  SDCfgDnodeReq   dcfgReq = {0};
181✔
1821
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
181✔
1822
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
181✔
1823
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
181✔
1824
    tFreeSMCfgDnodeReq(&cfgReq);
181✔
1825
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
181✔
1826
  } else {
1827
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
1828
    tFreeSMCfgDnodeReq(&cfgReq);
×
1829
    TAOS_RETURN(code);
×
1830
  }
1831

1832
#else
1833
  TAOS_RETURN(code);
1834
#endif
1835
}
1836

1837
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
181✔
1838
  SMnode *pMnode = pRsp->info.node;
181✔
1839
  int16_t nSuccess = 0;
181✔
1840
  int16_t nFailed = 0;
181✔
1841

1842
  if (0 == pRsp->code) {
181✔
1843
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
181✔
1844
  } else {
1845
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1846
  }
1847

1848
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
181✔
1849
  bool    finished = nSuccess + nFailed >= nReq;
181✔
1850

1851
  if (finished) {
181✔
1852
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
181✔
1853
  }
1854

1855
  const STraceId *trace = &pRsp->info.traceId;
181✔
1856
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
181✔
1857
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1858

1859
  return 0;
181✔
1860
}
1861

1862
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,204✔
1863
  SMnode *pMnode = pReq->info.node;
1,204✔
1864
  int32_t totalRows = 0;
1,204✔
1865
  int32_t numOfRows = 0;
1,204✔
1866
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,204✔
1867
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,204✔
1868
  char   *pWrite = NULL;
1,204✔
1869
  int32_t cols = 0;
1,204✔
1870
  int32_t code = 0;
1,204✔
1871
  int32_t lino = 0;
1,204✔
1872

1873
  cfgOpts[totalRows] = "statusIntervalMs";
1,204✔
1874
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
1,204✔
1875
  totalRows++;
1,204✔
1876

1877
  cfgOpts[totalRows] = "timezone";
1,204✔
1878
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,204✔
1879
  totalRows++;
1,204✔
1880

1881
  cfgOpts[totalRows] = "locale";
1,204✔
1882
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,204✔
1883
  totalRows++;
1,204✔
1884

1885
  cfgOpts[totalRows] = "charset";
1,204✔
1886
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,204✔
1887
  totalRows++;
1,204✔
1888

1889
  cfgOpts[totalRows] = "monitor";
1,204✔
1890
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,204✔
1891
  totalRows++;
1,204✔
1892

1893
  cfgOpts[totalRows] = "monitorInterval";
1,204✔
1894
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,204✔
1895
  totalRows++;
1,204✔
1896

1897
  cfgOpts[totalRows] = "slowLogThreshold";
1,204✔
1898
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,204✔
1899
  totalRows++;
1,204✔
1900

1901
  cfgOpts[totalRows] = "slowLogMaxLen";
1,204✔
1902
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,204✔
1903
  totalRows++;
1,204✔
1904

1905
  char scopeStr[64] = {0};
1,204✔
1906
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,204✔
1907
  cfgOpts[totalRows] = "slowLogScope";
1,204✔
1908
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,204✔
1909
  totalRows++;
1,204✔
1910

1911
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,204✔
1912
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,204✔
1913

1914
  for (int32_t i = 0; i < totalRows; i++) {
12,040✔
1915
    cols = 0;
10,836✔
1916

1917
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
10,836✔
1918
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,836✔
1919
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
10,836✔
1920

1921
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
10,836✔
1922
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,836✔
1923
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
10,836✔
1924

1925
    numOfRows++;
10,836✔
1926
  }
1927

1928
_OVER:
1,204✔
1929
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,204✔
1930
  pShow->numOfRows += numOfRows;
1,204✔
1931
  return numOfRows;
1,204✔
1932
}
1933

1934
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1935

1936
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
807,658✔
1937
  SMnode    *pMnode = pReq->info.node;
807,658✔
1938
  SSdb      *pSdb = pMnode->pSdb;
807,658✔
1939
  int32_t    numOfRows = 0;
807,658✔
1940
  int32_t    cols = 0;
807,658✔
1941
  ESdbStatus objStatus = 0;
807,658✔
1942
  SDnodeObj *pDnode = NULL;
807,658✔
1943
  int64_t    curMs = taosGetTimestampMs();
807,658✔
1944
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
807,480✔
1945
  int32_t    code = 0;
807,658✔
1946
  int32_t    lino = 0;
807,658✔
1947

1948
  while (numOfRows < rows) {
2,842,174✔
1949
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
2,842,174✔
1950
    if (pShow->pIter == NULL) break;
2,842,174✔
1951
    bool online = mndIsDnodeOnline(pDnode, curMs);
2,034,516✔
1952

1953
    cols = 0;
2,034,516✔
1954

1955
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,034,516✔
1956
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
2,034,516✔
1957

1958
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
2,034,516✔
1959

1960
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,034,516✔
1961
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,034,516✔
1962

1963
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,034,516✔
1964
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
2,034,516✔
1965
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
2,034,516✔
1966

1967
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,034,516✔
1968
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
2,034,516✔
1969
                        &lino, _OVER);
1970

1971
    const char *status = "ready";
2,034,516✔
1972
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
2,034,516✔
1973
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
2,034,516✔
1974
    if (!online) {
2,034,516✔
1975
      if (objStatus == SDB_STATUS_CREATING)
222,824✔
1976
        status = "creating*";
×
1977
      else if (objStatus == SDB_STATUS_DROPPING)
222,824✔
1978
        status = "dropping*";
×
1979
      else
1980
        status = "offline";
222,824✔
1981
    }
1982

1983
    STR_TO_VARSTR(buf, status);
2,034,516✔
1984
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,034,516✔
1985
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,034,516✔
1986

1987
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,034,516✔
1988
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
2,034,516✔
1989
                        _OVER);
1990

1991
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,034,516✔
1992
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
2,034,516✔
1993
                        _OVER);
1994

1995
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
2,034,516✔
1996
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
2,034,516✔
1997

1998
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,034,516✔
1999
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
2,034,516✔
2000
    taosMemoryFreeClear(b);
2,034,516✔
2001

2002
#ifdef TD_ENTERPRISE
2003
    STR_TO_VARSTR(buf, pDnode->machineId);
2,034,516✔
2004
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,034,516✔
2005
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,034,516✔
2006
#endif
2007

2008
    numOfRows++;
2,034,516✔
2009
    sdbRelease(pSdb, pDnode);
2,034,516✔
2010
  }
2011

2012
_OVER:
807,480✔
2013
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
807,658✔
2014

2015
  pShow->numOfRows += numOfRows;
807,658✔
2016
  return numOfRows;
807,658✔
2017
}
2018

2019
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
2020
  SSdb *pSdb = pMnode->pSdb;
×
2021
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
2022
}
×
2023

2024
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
2025
  int32_t    code = 0;
×
2026
  SDnodeObj *pObj = NULL;
×
2027
  void      *pIter = NULL;
×
2028
  SSdb      *pSdb = pMnode->pSdb;
×
2029
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
2030
  if (fqdns == NULL) {
×
2031
    mError("failed to init fqdns array");
×
2032
    return NULL;
×
2033
  }
2034

2035
  while (1) {
×
2036
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
2037
    if (pIter == NULL) break;
×
2038

2039
    char *fqdn = taosStrdup(pObj->fqdn);
×
2040
    if (fqdn == NULL) {
×
2041
      sdbRelease(pSdb, pObj);
×
2042
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
2043

2044
      code = terrno;
×
2045
      break;
×
2046
    }
2047

2048
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
2049
      mError("failed to fqdn into array, but continue at this time");
×
2050
    }
2051
    sdbRelease(pSdb, pObj);
×
2052
  }
2053

2054
_error:
×
2055
  if (code != 0) {
×
2056
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
2057
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
2058
      taosMemoryFreeClear(pFqdn);
×
2059
    }
2060
    taosArrayDestroy(fqdns);
×
2061
    fqdns = NULL;
×
2062
  }
2063

2064
  return fqdns;
×
2065
}
2066

2067
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq) {
623,945✔
2068
  SMnode     *pMnode = pReq->info.node;
623,945✔
2069
  SKeySyncReq req = {0};
623,945✔
2070
  SKeySyncRsp rsp = {0};
623,945✔
2071
  int32_t     code = TSDB_CODE_SUCCESS;
623,945✔
2072

2073
  code = tDeserializeSKeySyncReq(pReq->pCont, pReq->contLen, &req);
623,945✔
2074
  if (code != 0) {
623,945✔
2075
    mError("failed to deserialize key sync req, since %s", tstrerror(code));
×
2076
    goto _OVER;
×
2077
  }
2078

2079
  mInfo("received key sync req from dnode:%d, keyVersion:%d", req.dnodeId, req.keyVersion);
623,945✔
2080

2081
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2082
  // Load mnode's encryption keys
2083
  char masterKeyFile[PATH_MAX] = {0};
623,945✔
2084
  snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
623,945✔
2085
           TD_DIRSEP);
2086
  char derivedKeyFile[PATH_MAX] = {0};
623,945✔
2087
  snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
623,945✔
2088
           TD_DIRSEP);
2089
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
623,945✔
2090
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
623,945✔
2091
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
623,945✔
2092
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
623,945✔
2093
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
623,945✔
2094
  int32_t algorithm = 0;
623,945✔
2095
  int32_t cfgAlgorithm = 0;
623,945✔
2096
  int32_t metaAlgorithm = 0;
623,945✔
2097
  int32_t fileVersion = 0;
623,945✔
2098
  int32_t keyVersion = 0;
623,945✔
2099
  int64_t createTime = 0;
623,945✔
2100
  int64_t svrKeyUpdateTime = 0;
623,945✔
2101
  int64_t dbKeyUpdateTime = 0;
623,945✔
2102

2103
  if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
623,945✔
2104
    keyVersion = tsEncryptKeyVersion;
1,612✔
2105
    tstrncpy(svrKey, tsSvrKey, ENCRYPT_KEY_LEN + 1);
1,612✔
2106
    tstrncpy(dbKey, tsDbKey, ENCRYPT_KEY_LEN + 1);
1,612✔
2107
    tstrncpy(cfgKey, tsCfgKey, ENCRYPT_KEY_LEN + 1);
1,612✔
2108
    tstrncpy(metaKey, tsMetaKey, ENCRYPT_KEY_LEN + 1);
1,612✔
2109
    tstrncpy(dataKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
1,612✔
2110
    algorithm = tsEncryptAlgorithmType;
1,612✔
2111
    cfgAlgorithm = tsCfgAlgorithm;
1,612✔
2112
    metaAlgorithm = tsMetaAlgorithm;
1,612✔
2113
    fileVersion = tsEncryptFileVersion;
1,612✔
2114
    createTime = tsEncryptKeyCreateTime;
1,612✔
2115
    svrKeyUpdateTime = tsSvrKeyUpdateTime;
1,612✔
2116
    dbKeyUpdateTime = tsDbKeyUpdateTime;
1,612✔
2117
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_LOADED;
1,612✔
2118
  } else {
2119
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_DISABLED;
622,333✔
2120
  }
2121

2122
  // Check if dnode needs update
2123
  if (req.keyVersion != keyVersion) {
623,945✔
2124
    mInfo("dnode:%d key version mismatch, mnode:%d, dnode:%d, will send keys", req.dnodeId, keyVersion, req.keyVersion);
1,612✔
2125

2126
    rsp.keyVersion = keyVersion;
1,612✔
2127
    rsp.needUpdate = 1;
1,612✔
2128
    tstrncpy(rsp.svrKey, svrKey, sizeof(rsp.svrKey));
1,612✔
2129
    tstrncpy(rsp.dbKey, dbKey, sizeof(rsp.dbKey));
1,612✔
2130
    tstrncpy(rsp.cfgKey, cfgKey, sizeof(rsp.cfgKey));
1,612✔
2131
    tstrncpy(rsp.metaKey, metaKey, sizeof(rsp.metaKey));
1,612✔
2132
    tstrncpy(rsp.dataKey, dataKey, sizeof(rsp.dataKey));
1,612✔
2133
    rsp.algorithm = algorithm;
1,612✔
2134
    rsp.createTime = createTime;
1,612✔
2135
    rsp.svrKeyUpdateTime = svrKeyUpdateTime;
1,612✔
2136
    rsp.dbKeyUpdateTime = dbKeyUpdateTime;
1,612✔
2137
  } else {
2138
    mInfo("dnode:%d key version matches, version:%d", req.dnodeId, keyVersion);
622,333✔
2139
    rsp.keyVersion = keyVersion;
622,333✔
2140
    rsp.needUpdate = 0;
622,333✔
2141
  }
2142
#else
2143
  // Community edition - no encryption support
2144
  mWarn("enterprise features not enabled, key sync not supported");
2145
  rsp.keyVersion = 0;
2146
  rsp.needUpdate = 0;
2147
#endif
2148

2149
  int32_t contLen = tSerializeSKeySyncRsp(NULL, 0, &rsp);
623,945✔
2150
  if (contLen < 0) {
623,945✔
2151
    code = contLen;
×
2152
    goto _OVER;
×
2153
  }
2154

2155
  void *pHead = rpcMallocCont(contLen);
623,945✔
2156
  if (pHead == NULL) {
623,945✔
2157
    code = TSDB_CODE_OUT_OF_MEMORY;
×
2158
    goto _OVER;
×
2159
  }
2160

2161
  contLen = tSerializeSKeySyncRsp(pHead, contLen, &rsp);
623,945✔
2162
  if (contLen < 0) {
623,945✔
2163
    rpcFreeCont(pHead);
×
2164
    code = contLen;
×
2165
    goto _OVER;
×
2166
  }
2167

2168
  pReq->info.rspLen = contLen;
623,945✔
2169
  pReq->info.rsp = pHead;
623,945✔
2170

2171
_OVER:
623,945✔
2172
  if (code != 0) {
623,945✔
2173
    mError("failed to process key sync req, since %s", tstrerror(code));
×
2174
  }
2175
  return code;
623,945✔
2176
}
2177

2178
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq) { return 0; }
×
2179

2180
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
2181
  if (p == NULL) return NULL;
×
2182

2183
  switch (type) {
×
2184
    case SDB_DNODE:
×
2185
      return (SDnodeObj *)p;
×
2186
    case SDB_QNODE:
×
2187
      return ((SQnodeObj *)p)->pDnode;
×
2188
    case SDB_SNODE:
×
2189
      return ((SSnodeObj *)p)->pDnode;
×
2190
    case SDB_BNODE:
×
2191
      return ((SBnodeObj *)p)->pDnode;
×
2192
    default:
×
2193
      break;
×
2194
  }
2195
  return NULL;
×
2196
}
2197
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
2198
  int32_t lino = 0;
×
2199
  SSdb   *pSdb = pMnode->pSdb;
×
2200
  void   *pIter = NULL;
×
2201
  int32_t code = 0;
×
2202

2203
  while (1) {
×
2204
    void *pObj = NULL;
×
2205
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
2206
    if (pIter == NULL) break;
×
2207

2208
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
2209
    if (pDnodeObj == NULL) {
×
2210
      mError("null dnode object for type:%d", type);
×
2211
      sdbRelease(pSdb, pObj);
×
2212
      continue;
×
2213
    }
2214

2215
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
2216
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
2217
      mError("failed to push addr into array");
×
2218
      sdbRelease(pSdb, pObj);
×
2219
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2220
    }
2221
    sdbRelease(pSdb, pObj);
×
2222
  }
2223

2224
_exit:
×
2225
  return code;
×
2226
}
2227

2228
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
2229
  int32_t lino = 0;
×
2230
  int32_t code = 0;
×
2231
  if (pMnode == NULL || pAddr == NULL) {
×
2232
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
2233
  }
2234

2235
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
2236
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2237

2238
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
2239
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2240

2241
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
2242
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2243

2244
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
2245
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2246

2247
_error:
×
2248
  return code;
×
2249
}
2250

2251
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
2252
  int32_t code = 0;
×
2253

2254
  SMnode *pMnode = pReq->info.node;
×
2255
  void   *pIter = NULL;
×
2256
  SSdb   *pSdb = pMnode->pSdb;
×
2257
  mInfo("start to reload dnode tls config");
×
2258

2259
  SMCfgDnodeReq req = {0};
×
2260
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
2261
    goto _OVER;
×
2262
  }
2263

2264
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
2265
    goto _OVER;
×
2266
  }
2267

2268
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
2269
  if (pAddr == NULL) {
×
2270
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
2271
  }
2272

2273
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
2274

2275
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
2276
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
2277
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
2278
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
2279
    if (code != 0) {
×
2280
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
2281
    }
2282
  }
2283

2284
_OVER:
×
2285
  tFreeSMCfgDnodeReq(&req);
×
2286
  taosArrayDestroy(pAddr);
×
2287
  return code;
×
2288
}
2289

2290
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
2291
  int32_t code = 0;
×
2292
  if (pRsp->code != 0) {
×
2293
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
2294
  } else {
2295
    mInfo("succeed to reload dnode tls config");
×
2296
  }
2297
  return code;
×
2298
}
2299

2300
static int32_t mndProcessAlterEncryptKeyReqImpl(SRpcMsg *pReq, SMAlterEncryptKeyReq *pAlterReq) {
×
2301
  int32_t code = 0;
×
2302
  SMnode *pMnode = pReq->info.node;
×
2303
  SSdb   *pSdb = pMnode->pSdb;
×
2304
  void   *pIter = NULL;
×
2305

2306
  const STraceId *trace = &pReq->info.traceId;
×
2307

2308
  // Validate key type
2309
  if (pAlterReq->keyType != 0 && pAlterReq->keyType != 1) {
×
2310
    mGError("msg:%p, failed to alter encrypt key since invalid key type:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", pReq,
×
2311
            pAlterReq->keyType);
2312
    return TSDB_CODE_INVALID_PARA;
×
2313
  }
2314

2315
  // Validate new key length
2316
  int32_t klen = strlen(pAlterReq->newKey);
×
2317
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
2318
    mGError("msg:%p, failed to alter encrypt key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
2319
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
2320
    return TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
2321
  }
2322

2323
  // Prepare SMAlterEncryptKeyReq for distribution to dnodes
2324
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
2325
  alterKeyReq.keyType = pAlterReq->keyType;
×
2326
  tstrncpy(alterKeyReq.newKey, pAlterReq->newKey, sizeof(alterKeyReq.newKey));
×
2327
  alterKeyReq.sqlLen = 0;
×
2328
  alterKeyReq.sql = NULL;
×
2329

2330
  // Send request to all online dnodes
2331
  while (1) {
×
2332
    SDnodeObj *pDnode = NULL;
×
2333
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2334
    if (pIter == NULL) break;
×
2335

2336
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
2337
      mGWarn("msg:%p, don't send alter encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2338
             offlineReason[pDnode->offlineReason]);
2339
      sdbRelease(pSdb, pDnode);
×
2340
      continue;
×
2341
    }
2342

2343
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2344
    int32_t bufLen = tSerializeSMAlterEncryptKeyReq(NULL, 0, &alterKeyReq);
×
2345
    void   *pBuf = rpcMallocCont(bufLen);
×
2346

2347
    if (pBuf != NULL) {
×
2348
      if ((bufLen = tSerializeSMAlterEncryptKeyReq(pBuf, bufLen, &alterKeyReq)) <= 0) {
×
2349
        code = bufLen;
×
2350
        sdbRelease(pSdb, pDnode);
×
2351
        goto _exit;
×
2352
      }
2353
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
2354
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2355
      if (ret != 0) {
×
2356
        mGError("msg:%p, failed to send alter encrypt_key req to dnode:%d, error:%s", pReq, pDnode->id, tstrerror(ret));
×
2357
      } else {
2358
        mGInfo("msg:%p, send alter encrypt_key req to dnode:%d, keyType:%d", pReq, pDnode->id, pAlterReq->keyType);
×
2359
      }
2360
    }
2361

2362
    sdbRelease(pSdb, pDnode);
×
2363
  }
2364

2365
  // Note: mnode runs on dnode, so the local keys will be updated by dnode itself
2366
  // when it receives the alter encrypt key request from mnode
2367
  mGInfo("msg:%p, successfully sent alter encrypt key request to all dnodes, keyType:%d", pReq, pAlterReq->keyType);
×
2368

2369
_exit:
×
2370
  if (code != 0) {
×
2371
    if (terrno == 0) terrno = code;
×
2372
  }
2373
  return code;
×
2374
}
2375

2376
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq) {
×
2377
  SMnode              *pMnode = pReq->info.node;
×
2378
  SMAlterEncryptKeyReq alterReq = {0};
×
2379
  int32_t              code = TSDB_CODE_SUCCESS;
×
2380
  int32_t              lino = 0;
×
2381

2382
  // Check privilege - only admin can alter encryption keys
2383
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2384
                  &lino, _OVER);
2385

2386
  // Deserialize request
2387
  code = tDeserializeSMAlterEncryptKeyReq(pReq->pCont, pReq->contLen, &alterReq);
×
2388
  if (code != 0) {
×
2389
    mError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
2390
    goto _OVER;
×
2391
  }
2392

2393
  mInfo("received alter encrypt key req, keyType:%d", alterReq.keyType);
×
2394

2395
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2396
  // Process and distribute to all dnodes
2397
  code = mndProcessAlterEncryptKeyReqImpl(pReq, &alterReq);
×
2398
  if (code == 0) {
×
2399
    // Audit log
2400
    auditRecord(pReq, pMnode->clusterId, "alterEncryptKey", "", alterReq.keyType == 0 ? "SVR_KEY" : "DB_KEY",
×
2401
                alterReq.sql, alterReq.sqlLen, 0, 0);
2402
  }
2403
#else
2404
  // Community edition - no encryption support
2405
  mError("encryption key management is only available in enterprise edition");
2406
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2407
#endif
2408

2409
_OVER:
×
2410
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2411
    mError("failed to alter encrypt key, keyType:%d, since %s", alterReq.keyType, tstrerror(code));
×
2412
  }
2413

2414
  tFreeSMAlterEncryptKeyReq(&alterReq);
×
2415
  TAOS_RETURN(code);
×
2416
}
2417

2418
static int32_t mndProcessAlterKeyExpirationReqImpl(SRpcMsg *pReq, SMAlterKeyExpirationReq *pAlterReq) {
×
2419
  int32_t code = 0;
×
2420
  SMnode *pMnode = pReq->info.node;
×
2421
  SSdb   *pSdb = pMnode->pSdb;
×
2422
  void   *pIter = NULL;
×
2423

2424
  const STraceId *trace = &pReq->info.traceId;
×
2425

2426
  // Validate days value
2427
  if (pAlterReq->days < 0) {
×
2428
    mGError("msg:%p, failed to alter key expiration since invalid days:%d, must be >= 0", pReq, pAlterReq->days);
×
2429
    return TSDB_CODE_INVALID_PARA;
×
2430
  }
2431

2432
  // Validate strategy
2433
  if (strlen(pAlterReq->strategy) == 0) {
×
2434
    mGError("msg:%p, failed to alter key expiration since empty strategy", pReq);
×
2435
    return TSDB_CODE_INVALID_PARA;
×
2436
  }
2437

2438
  // Prepare SMAlterKeyExpirationReq for distribution to dnodes
2439
  SMAlterKeyExpirationReq alterReq = {0};
×
2440
  alterReq.days = pAlterReq->days;
×
2441
  tstrncpy(alterReq.strategy, pAlterReq->strategy, sizeof(alterReq.strategy));
×
2442
  alterReq.sqlLen = 0;
×
2443
  alterReq.sql = NULL;
×
2444

2445
  // Send request to all online dnodes
2446
  while (1) {
×
2447
    SDnodeObj *pDnode = NULL;
×
2448
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2449
    if (pIter == NULL) break;
×
2450

2451
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
2452
      mGWarn("msg:%p, don't send alter key_expiration req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2453
             offlineReason[pDnode->offlineReason]);
2454
      sdbRelease(pSdb, pDnode);
×
2455
      continue;
×
2456
    }
2457

2458
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2459
    int32_t bufLen = tSerializeSMAlterKeyExpirationReq(NULL, 0, &alterReq);
×
2460
    void   *pBuf = rpcMallocCont(bufLen);
×
2461

2462
    if (pBuf != NULL) {
×
2463
      if ((bufLen = tSerializeSMAlterKeyExpirationReq(pBuf, bufLen, &alterReq)) <= 0) {
×
2464
        code = bufLen;
×
2465
        sdbRelease(pSdb, pDnode);
×
2466
        goto _exit;
×
2467
      }
2468
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_KEY_EXPIRATION, .pCont = pBuf, .contLen = bufLen};
×
2469
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2470
      if (ret != 0) {
×
2471
        mGError("msg:%p, failed to send alter key_expiration req to dnode:%d, error:%s", pReq, pDnode->id,
×
2472
                tstrerror(ret));
2473
      } else {
2474
        mGInfo("msg:%p, send alter key_expiration req to dnode:%d, days:%d, strategy:%s", pReq, pDnode->id,
×
2475
               pAlterReq->days, pAlterReq->strategy);
2476
      }
2477
    }
2478

2479
    sdbRelease(pSdb, pDnode);
×
2480
  }
2481

2482
  mGInfo("msg:%p, successfully sent alter key expiration request to all dnodes, days:%d, strategy:%s", pReq,
×
2483
         pAlterReq->days, pAlterReq->strategy);
2484

2485
_exit:
×
2486
  if (code != 0) {
×
2487
    if (terrno == 0) terrno = code;
×
2488
  }
2489
  return code;
×
2490
}
2491

2492
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq) {
×
2493
  SMnode                 *pMnode = pReq->info.node;
×
2494
  SMAlterKeyExpirationReq alterReq = {0};
×
2495
  int32_t                 code = TSDB_CODE_SUCCESS;
×
2496
  int32_t                 lino = 0;
×
2497

2498
  // Check privilege - only admin can alter key expiration
2499
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2500
                  &lino, _OVER);
2501

2502
  // Deserialize request
2503
  code = tDeserializeSMAlterKeyExpirationReq(pReq->pCont, pReq->contLen, &alterReq);
×
2504
  if (code != 0) {
×
2505
    mError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
2506
    goto _OVER;
×
2507
  }
2508

2509
  mInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
2510

2511
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2512
  // Process and distribute to all dnodes
2513
  code = mndProcessAlterKeyExpirationReqImpl(pReq, &alterReq);
×
2514
  if (code == 0) {
×
2515
    // Audit log
2516
    char detail[128];
×
2517
    snprintf(detail, sizeof(detail), "%d DAYS %s", alterReq.days, alterReq.strategy);
×
2518
    auditRecord(pReq, pMnode->clusterId, "alterKeyExpiration", "", detail, alterReq.sql, alterReq.sqlLen, 0, 0);
×
2519
  }
2520
#else
2521
  // Community edition - no encryption support
2522
  mError("key expiration management is only available in enterprise edition");
2523
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2524
#endif
2525

2526
_OVER:
×
2527
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2528
    mError("failed to alter key expiration, days:%d, strategy:%s, since %s", alterReq.days, alterReq.strategy,
×
2529
           tstrerror(code));
2530
  }
2531

2532
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
2533
  TAOS_RETURN(code);
×
2534
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc