• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5048

10 May 2026 03:11AM UTC coverage: 73.222% (+0.07%) from 73.152%
#5048

push

travis-ci

web-flow
merge: from main to 3.0 branch #35290

353 of 452 new or added lines in 9 files covered. (78.1%)

587 existing lines in 140 files now uncovered.

278189 of 379928 relevant lines covered (73.22%)

135206397.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.52
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndToken.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "taos_monitor.h"
34
#include "tconfig.h"
35
#include "tjson.h"
36
#include "tmisce.h"
37
#include "tunit.h"
38
#if defined(TD_ENTERPRISE)
39
#include "taoskInt.h"
40
#endif
41

42
#define TSDB_DNODE_VER_NUMBER   2
43
#define TSDB_DNODE_RESERVE_SIZE 40
44

45
static const char *offlineReason[] = {
46
    "",
47
    "status msg timeout",
48
    "status not received",
49
    "version not match",
50
    "dnodeId not match",
51
    "clusterId not match",
52
    "statusInterval not match",
53
    "timezone not match",
54
    "locale not match",
55
    "charset not match",
56
    "ttlChangeOnWrite not match",
57
    "enableWhiteList not match",
58
    "encryptionKey not match",
59
    "monitor not match",
60
    "monitor switch not match",
61
    "monitor interval not match",
62
    "monitor slow log threshold not match",
63
    "monitor slow log sql max len not match",
64
    "monitor slow log scope not match",
65
    "unknown",
66
};
67

68
enum {
69
  DND_ACTIVE_CODE,
70
  DND_CONN_ACTIVE_CODE,
71
};
72

73
enum {
74
  DND_CREATE,
75
  DND_ADD,
76
  DND_DROP,
77
};
78

79
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
80
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
81
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
82
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
83
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
84
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
85
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
86

87
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
89
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
90
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
91
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
92
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
93
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
94
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
95
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
96
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
97
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
98

99
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
100
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
101
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
102
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
103

104
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq);
105
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq);
106
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
107
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
108
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq);
109
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq);
110

111
#ifdef _GRANT
112
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
113
#else
114
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
115
#endif
116

117
int32_t mndInitDnode(SMnode *pMnode) {
514,596✔
118
  SSdbTable table = {
514,596✔
119
      .sdbType = SDB_DNODE,
120
      .keyType = SDB_KEY_INT32,
121
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
122
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
123
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
124
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
125
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
126
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
127
  };
128

129
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
514,596✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
514,596✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
514,596✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
514,596✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
514,596✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
514,596✔
135
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
514,596✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
514,596✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
514,596✔
138
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
514,596✔
139
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
514,596✔
140
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
514,596✔
141
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
514,596✔
142
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
514,596✔
143

144
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
514,596✔
145
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
514,596✔
146
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
514,596✔
147
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
514,596✔
148

149
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC, mndProcessKeySyncReq);
514,596✔
150
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC_RSP, mndProcessKeySyncRsp);
514,596✔
151
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_ENCRYPT_KEY, mndProcessAlterEncryptKeyReq);
514,596✔
152
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_KEY_EXPIRATION, mndProcessAlterKeyExpirationReq);
514,596✔
153

154
  return sdbSetTable(pMnode->pSdb, table);
514,596✔
155
}
156

157
void mndCleanupDnode(SMnode *pMnode) {}
514,528✔
158

159
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
371,566✔
160
  int32_t  code = -1;
371,566✔
161
  SSdbRaw *pRaw = NULL;
371,566✔
162
  STrans  *pTrans = NULL;
371,566✔
163

164
  SDnodeObj dnodeObj = {0};
371,566✔
165
  dnodeObj.id = 1;
371,566✔
166
  dnodeObj.createdTime = taosGetTimestampMs();
371,566✔
167
  dnodeObj.updateTime = dnodeObj.createdTime;
371,566✔
168
  dnodeObj.port = tsServerPort;
371,566✔
169
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
371,566✔
170
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
371,566✔
171
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
371,566✔
172
  char *machineId = NULL;
371,566✔
173
  code = tGetMachineId(&machineId);
371,566✔
174
  if (machineId) {
371,566✔
175
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
371,566✔
176
    taosMemoryFreeClear(machineId);
371,566✔
177
  } else {
178
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
179
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
180
    goto _OVER;
×
181
#endif
182
  }
183

184
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
371,566✔
185
  if (pTrans == NULL) {
371,566✔
186
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
187
    if (terrno != 0) code = terrno;
×
188
    goto _OVER;
×
189
  }
190
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
371,566✔
191

192
  pRaw = mndDnodeActionEncode(&dnodeObj);
371,566✔
193
  if (pRaw == NULL) {
371,566✔
194
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
195
    if (terrno != 0) code = terrno;
×
196
    goto _OVER;
×
197
  }
198
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
371,566✔
199
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
371,566✔
200
  pRaw = NULL;
371,566✔
201

202
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
371,566✔
203
  code = 0;
371,566✔
204

205
_OVER:
371,566✔
206
  mndTransDrop(pTrans);
371,566✔
207
  sdbFreeRaw(pRaw);
371,566✔
208
  return code;
371,566✔
209
}
210

211
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,825,893✔
212
  int32_t code = 0;
2,825,893✔
213
  int32_t lino = 0;
2,825,893✔
214
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,825,893✔
215

216
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,825,893✔
217
  if (pRaw == NULL) goto _OVER;
2,825,893✔
218

219
  int32_t dataPos = 0;
2,825,893✔
220
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,825,893✔
221
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,825,893✔
222
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,825,893✔
223
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,825,893✔
224
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,825,893✔
225
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,825,893✔
226
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,825,893✔
227
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,825,893✔
228
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,825,893✔
229
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,825,893✔
230

231
  terrno = 0;
2,825,893✔
232

233
_OVER:
2,825,893✔
234
  if (terrno != 0) {
2,825,893✔
235
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
236
    sdbFreeRaw(pRaw);
×
237
    return NULL;
×
238
  }
239

240
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,825,893✔
241
  return pRaw;
2,825,893✔
242
}
243

244
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
1,874,733✔
245
  int32_t code = 0;
1,874,733✔
246
  int32_t lino = 0;
1,874,733✔
247
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,874,733✔
248
  SSdbRow   *pRow = NULL;
1,874,733✔
249
  SDnodeObj *pDnode = NULL;
1,874,733✔
250

251
  int8_t sver = 0;
1,874,733✔
252
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,874,733✔
253
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
1,874,733✔
254
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
255
    goto _OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SDnodeObj));
1,874,733✔
259
  if (pRow == NULL) goto _OVER;
1,874,733✔
260

261
  pDnode = sdbGetRowObj(pRow);
1,874,733✔
262
  if (pDnode == NULL) goto _OVER;
1,874,733✔
263

264
  int32_t dataPos = 0;
1,874,733✔
265
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
1,874,733✔
266
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
1,874,733✔
267
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
1,874,733✔
268
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
1,874,733✔
269
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
1,874,733✔
270
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
1,874,733✔
271
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
1,874,733✔
272
  if (sver > 1) {
1,874,733✔
273
    int16_t keyLen = 0;
1,874,733✔
274
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,874,733✔
275
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,874,733✔
276
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,874,733✔
277
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,874,733✔
278
  }
279

280
  terrno = 0;
1,874,733✔
281
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
1,874,733✔
282
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
283
  }
284

285
_OVER:
1,874,733✔
286
  if (terrno != 0) {
1,874,733✔
287
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
1,874,733✔
293
  return pRow;
1,874,733✔
294
}
295

296
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
897,762✔
297
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
897,762✔
298
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
897,762✔
299

300
  char ep[TSDB_EP_LEN] = {0};
897,762✔
301
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
897,762✔
302
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
897,762✔
303
  return 0;
897,762✔
304
}
305

306
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
1,874,681✔
307
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
1,874,681✔
308
  return 0;
1,874,681✔
309
}
310

311
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
965,973✔
312
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
965,973✔
313
  pOld->updateTime = pNew->updateTime;
965,973✔
314
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
315
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
965,973✔
316
#endif
317
  return 0;
965,973✔
318
}
319

320
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
203,774,008✔
321
  SSdb      *pSdb = pMnode->pSdb;
203,774,008✔
322
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
203,774,045✔
323
  if (pDnode == NULL) {
203,775,757✔
324
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
245,225✔
325
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
20,460✔
326
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
224,765✔
327
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
328
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
224,765✔
329
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
224,765✔
330
    } else {
331
      terrno = TSDB_CODE_APP_ERROR;
×
332
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
333
    }
334
  }
335

336
  return pDnode;
203,774,915✔
337
}
338

339
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
205,156,998✔
340
  SSdb *pSdb = pMnode->pSdb;
205,156,998✔
341
  sdbRelease(pSdb, pDnode);
205,157,803✔
342
}
205,157,054✔
343

344
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
11,965,175✔
345
  SEpSet epSet = {0};
11,965,175✔
346
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
11,965,175✔
347
  return epSet;
11,965,175✔
348
}
349

350
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
1,306,146✔
351
  SEpSet     epSet = {0};
1,306,146✔
352
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,306,146✔
353
  if (!pDnode) return epSet;
1,306,146✔
354

355
  epSet = mndGetDnodeEpset(pDnode);
1,306,146✔
356

357
  mndReleaseDnode(pMnode, pDnode);
1,306,146✔
358
  return epSet;
1,306,146✔
359
}
360

361
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,617,617✔
362
  SSdb *pSdb = pMnode->pSdb;
1,617,617✔
363

364
  void *pIter = NULL;
1,617,617✔
365
  while (1) {
2,660,455✔
366
    SDnodeObj *pDnode = NULL;
4,278,072✔
367
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
4,278,072✔
368
    if (pIter == NULL) break;
4,278,072✔
369

370
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
3,368,575✔
371
      sdbCancelFetch(pSdb, pIter);
708,120✔
372
      return pDnode;
708,120✔
373
    }
374

375
    sdbRelease(pSdb, pDnode);
2,660,455✔
376
  }
377

378
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
909,497✔
379
  return NULL;
909,497✔
380
}
381

382
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
167,318✔
383
  SSdb *pSdb = pMnode->pSdb;
167,318✔
384

385
  void *pIter = NULL;
167,318✔
386
  while (1) {
177,568✔
387
    SDnodeObj *pDnode = NULL;
344,886✔
388
    ESdbStatus objStatus = 0;
344,886✔
389
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
344,886✔
390
    if (pIter == NULL) break;
344,886✔
391

392
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
344,886✔
393
      sdbCancelFetch(pSdb, pIter);
167,318✔
394
      return pDnode;
167,318✔
395
    }
396

397
    sdbRelease(pSdb, pDnode);
177,568✔
398
  }
399

400
  return NULL;
×
401
}
402

403
int32_t mndGetDnodeSize(SMnode *pMnode) {
101,840,097✔
404
  SSdb *pSdb = pMnode->pSdb;
101,840,097✔
405
  return sdbGetSize(pSdb, SDB_DNODE);
101,841,295✔
406
}
407

408
int32_t mndGetDbSize(SMnode *pMnode) {
×
409
  SSdb *pSdb = pMnode->pSdb;
×
410
  return sdbGetSize(pSdb, SDB_DB);
×
411
}
412

413
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
126,850,411✔
414
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
126,850,411✔
415
  if (interval > (int64_t)tsStatusTimeoutMs) {
126,848,073✔
416
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,592,024✔
417
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
50,211✔
418
    }
419
    return false;
2,592,613✔
420
  }
421
  return true;
124,256,049✔
422
}
423

424
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,398,770✔
425
  SSdb *pSdb = pMnode->pSdb;
2,398,770✔
426

427
  int32_t numOfEps = 0;
2,398,770✔
428
  void   *pIter = NULL;
2,398,770✔
429
  while (1) {
7,426,936✔
430
    SDnodeObj *pDnode = NULL;
9,825,706✔
431
    ESdbStatus objStatus = 0;
9,825,706✔
432
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
9,825,706✔
433
    if (pIter == NULL) break;
9,825,706✔
434

435
    SDnodeEp dnodeEp = {0};
7,426,936✔
436
    dnodeEp.id = pDnode->id;
7,426,936✔
437
    dnodeEp.ep.port = pDnode->port;
7,426,936✔
438
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
7,426,936✔
439
    sdbRelease(pSdb, pDnode);
7,426,936✔
440

441
    dnodeEp.isMnode = 0;
7,426,936✔
442
    if (mndIsMnode(pMnode, pDnode->id)) {
7,426,936✔
443
      dnodeEp.isMnode = 1;
2,952,613✔
444
    }
445
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
7,426,936✔
446
      mError("failed to put ep into array, but continue at this call");
×
447
    }
448
  }
449
}
2,398,770✔
450

451
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
31,867,484✔
452
  SSdb   *pSdb = pMnode->pSdb;
31,867,484✔
453
  int32_t code = 0;
31,867,484✔
454

455
  int32_t numOfEps = 0;
31,867,484✔
456
  void   *pIter = NULL;
31,867,484✔
457
  while (1) {
132,022,711✔
458
    SDnodeObj *pDnode = NULL;
163,890,195✔
459
    ESdbStatus objStatus = 0;
163,890,195✔
460
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
163,890,195✔
461
    if (pIter == NULL) break;
163,890,195✔
462

463
    SDnodeInfo dInfo;
132,018,909✔
464
    dInfo.id = pDnode->id;
132,022,711✔
465
    dInfo.ep.port = pDnode->port;
132,022,711✔
466
    dInfo.offlineReason = pDnode->offlineReason;
132,022,711✔
467
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
132,022,711✔
468
    sdbRelease(pSdb, pDnode);
132,022,711✔
469
    if (mndIsMnode(pMnode, pDnode->id)) {
132,022,711✔
470
      dInfo.isMnode = 1;
37,001,011✔
471
    } else {
472
      dInfo.isMnode = 0;
95,021,700✔
473
    }
474

475
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
132,022,711✔
476
      code = terrno;
×
477
      sdbCancelFetch(pSdb, pIter);
×
478
      break;
×
479
    }
480
  }
481
  TAOS_RETURN(code);
31,867,484✔
482
}
483

484
#define CHECK_MONITOR_PARA(para, err)                                                                    \
485
  if (pCfg->monitorParas.para != para) {                                                                 \
486
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
487
    terrno = err;                                                                                        \
488
    return err;                                                                                          \
489
  }
490

491
/*
492
 * Cached timezone offset to avoid DST-transition race:
493
 * dnode computes offset at send-time; if mnode recomputes
494
 * at receive-time the value may differ during the ~1 s
495
 * DST switchover, causing a spurious timezone mismatch.
496
 *
497
 * Keep both the previous and current cached offsets so
498
 * that during a DST transition window dnodes reporting
499
 * either the old or the new offset are both accepted.
500
 * Refresh at most once per 60 s; the previous offset
501
 * is only accepted for a short grace period (3 min)
502
 * to avoid permanently weakening the check.
503
 */
504
static int64_t tsCachedTzOffset     = 0;
505
static int64_t tsCachedTzOffsetPrev = 0;
506
static int64_t tsCachedTzOffsetMs   = 0;
507
static int64_t tsCachedPrevSetMs    = 0;
508
static int8_t  tsCachedHasPrev      = 0;
509
/*
510
 * Seqlock sequence: even = stable, odd = write in
511
 * progress.  CAS from even→odd grants single-writer
512
 * access; store even+2 publishes the new snapshot.
513
 */
514
static int64_t tsTzSeq              = 0;
515
#define TZ_CACHE_REFRESH_MS  60000
516
#define TZ_PREV_GRACE_MS    180000
517

518
typedef struct {
519
  int64_t offset;
520
  int64_t offsetPrev;
521
  int64_t refreshMs;
522
  int64_t prevSetMs;
523
  int8_t  hasPrev;
524
} STzSnapshot;
525

526
/*
527
 * Read a consistent snapshot of the tz cache.
528
 * Spins while a writer is active (odd seq) or
529
 * if the snapshot was torn (seq changed).
530
 */
531
static void mndReadTzSnapshot(STzSnapshot *s) {
×
532
  int64_t seq;
533
  do {
534
    seq = atomic_load_64(&tsTzSeq);
×
535
    if (seq & 1) continue;
×
536
    s->offset     = atomic_load_64(&tsCachedTzOffset);
×
537
    s->offsetPrev = atomic_load_64(&tsCachedTzOffsetPrev);
×
538
    s->refreshMs  = atomic_load_64(&tsCachedTzOffsetMs);
×
539
    s->prevSetMs  = atomic_load_64(&tsCachedPrevSetMs);
×
540
    s->hasPrev    = atomic_load_8(&tsCachedHasPrev);
×
541
  } while (atomic_load_64(&tsTzSeq) != seq);
×
542
}
×
543

544
/*
545
 * Try to refresh the tz cache.  Uses CAS on tsTzSeq
546
 * to ensure single-writer; if another thread is
547
 * already refreshing, this is a harmless no-op.
548
 */
549
static void mndRefreshTzCache(int64_t nowMs) {
×
550
  int64_t seq = atomic_load_64(&tsTzSeq);
×
551
  if (seq & 1) return;
×
552
  if (atomic_val_compare_exchange_64(&tsTzSeq, seq, seq + 1) != seq) {
×
553
    return;
×
554
  }
555

556
  /* seq is now odd — we are the sole writer */
557
  int32_t code = TSDB_CODE_SUCCESS;
×
558
  int64_t offset = (int64_t)taosGetLocalTimezoneOffset(&code);
×
559
  if (code != TSDB_CODE_SUCCESS) {
×
560
    mError("failed to get local timezone offset since %s", tstrerror(code));
×
561
    /* rollback: restore even seq */
562
    atomic_store_64(&tsTzSeq, seq);
×
563
    return;
×
564
  }
565

566
  int64_t oldMs = atomic_load_64(&tsCachedTzOffsetMs);
×
567
  int64_t oldOff = atomic_load_64(&tsCachedTzOffset);
×
568
  if (oldMs != 0 && offset != oldOff) {
×
569
    /* offset changed (DST edge) — keep old one */
570
    atomic_store_64(&tsCachedTzOffsetPrev, oldOff);
×
571
    atomic_store_64(&tsCachedPrevSetMs, nowMs);
×
572
    atomic_store_8(&tsCachedHasPrev, 1);
×
573
  }
574
  atomic_store_64(&tsCachedTzOffset, offset);
×
575
  atomic_store_64(&tsCachedTzOffsetMs, nowMs);
×
576

577
  /* publish: even seq+2 => readers see new state */
578
  atomic_store_64(&tsTzSeq, seq + 2);
×
579
}
580

581
static bool mndMatchTzSnapshot(const STzSnapshot *s, int64_t dnodeOff,
×
582
                               int64_t nowMs) {
583
  if (dnodeOff == s->offset) return true;
×
584
  if (s->hasPrev && nowMs - s->prevSetMs < TZ_PREV_GRACE_MS &&
×
585
      dnodeOff == s->offsetPrev) {
×
586
    return true;
×
587
  }
588
  return false;
×
589
}
590

591
static bool mndCheckTimezoneOffset(int64_t dnodeOffset) {
×
592
  STzSnapshot snap;
×
593
  int64_t     nowMs = taosGetTimestampMs();
×
594

595
  mndReadTzSnapshot(&snap);
×
596

597
  if (snap.refreshMs == 0 || nowMs - snap.refreshMs >= TZ_CACHE_REFRESH_MS) {
×
598
    mndRefreshTzCache(nowMs);
×
599
    mndReadTzSnapshot(&snap);
×
600
  }
601

602
  if (mndMatchTzSnapshot(&snap, dnodeOffset, nowMs))
×
603
    return true;
×
604

605
  /*
606
   * dnodeOffset doesn't match — force an immediate
607
   * refresh in case we are stale after a DST switch,
608
   * then re-check.
609
   */
610
  mndRefreshTzCache(nowMs);
×
611
  mndReadTzSnapshot(&snap);
×
612

613
  return mndMatchTzSnapshot(&snap, dnodeOffset, nowMs);
×
614
}
615

616
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
×
617
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
×
618
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
×
619
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
×
620
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
×
621
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
×
622

623
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
×
624
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
625
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
626
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
627
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
628
  }
629

630
  /*
631
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
632
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
633
           tsStatusIntervalMs);
634
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
635
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
636
  }
637
  */
638

639
  if (0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) {
×
640
    if (!mndCheckTimezoneOffset(pCfg->checkTime)) {
×
641
      mError("dnode:%d, timezone:%s checkTime:%" PRId64
×
642
             " inconsistent with cluster %s",
643
             pDnode->id, pCfg->timezone, pCfg->checkTime, tsTimezoneStr);
644
      terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
645
      return DND_REASON_TIME_ZONE_NOT_MATCH;
×
646
    }
647
  }
648

649
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
×
650
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
651
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
652
    return DND_REASON_LOCALE_NOT_MATCH;
×
653
  }
654

655
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
×
656
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
657
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
658
    return DND_REASON_CHARSET_NOT_MATCH;
×
659
  }
660

661
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
×
662
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
663
           tsTtlChangeOnWrite);
664
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
665
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
666
  }
667
  int8_t enable = tsEnableWhiteList ? 1 : 0;
×
668
  if (pCfg->enableWhiteList != enable) {
×
669
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
670
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
671
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
672
  }
673

674
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
×
675
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
×
676
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
677
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
678
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
679
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
680
  }
681

682
  return DND_REASON_ONLINE;
×
683
}
684

685
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
264,563✔
686
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
264,563✔
687
    return 0.0;
617✔
688
  }
689

690
  int64_t deltaCount = currentCount - lastCount;
263,946✔
691
  int64_t deltaMs = currentTimeMs - lastTimeMs;
263,946✔
692
  double  rate = (double)deltaCount / (double)deltaMs;
263,946✔
693
  return rate;
263,946✔
694
}
695

696
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
170,208,757✔
697
  bool stateChanged = false;
170,208,757✔
698
  bool roleChanged = pGid->syncState != pVload->syncState ||
170,229,667✔
699
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
334,738,819✔
700
                     pGid->roleTimeMs != pVload->roleTimeMs;
164,530,062✔
701

702
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
170,208,757✔
703
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
390,182✔
704
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
108,284✔
705
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
281,898✔
706
      int64_t currentTimeMs = taosGetTimestampMs();
264,563✔
707
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
264,563✔
708
                                          pGid->lastSyncAppliedIndexUpdateTime);
709

710
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
264,563✔
711
    }
712
  }
713

714
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
170,208,757✔
715
  pGid->syncCommitIndex = pVload->syncCommitIndex;
170,208,757✔
716
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
170,208,757✔
717
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
170,208,757✔
718
  pGid->learnerProgress = pVload->learnerProgress;
170,208,757✔
719
  pGid->snapSeq = pVload->snapSeq;
170,208,757✔
720
  pGid->syncTotalIndex = pVload->syncTotalIndex;
170,208,757✔
721
  if (pVload->snapSeq > 0 && pVload->snapSeq < SYNC_SNAPSHOT_SEQ_END || pVload->syncState == TAOS_SYNC_STATE_LEARNER) {
170,208,757✔
722
    mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64
757,966✔
723
          " , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d",
724
          vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex,
725
          pVload->syncTotalIndex, pVload->learnerProgress, pVload->snapSeq);
726
  }
727

728
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
170,208,757✔
729
      pGid->startTimeMs != pVload->startTimeMs) {
163,877,713✔
730
    mInfo(
6,331,044✔
731
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
732
        "canRead:%d, dnode:%d",
733
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
734
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
735
    pGid->syncState = pVload->syncState;
6,331,044✔
736
    pGid->syncTerm = pVload->syncTerm;
6,331,044✔
737
    pGid->syncRestore = pVload->syncRestore;
6,331,044✔
738
    pGid->syncCanRead = pVload->syncCanRead;
6,331,044✔
739
    pGid->startTimeMs = pVload->startTimeMs;
6,331,044✔
740
    pGid->roleTimeMs = pVload->roleTimeMs;
6,331,044✔
741
    stateChanged = true;
6,331,044✔
742
  }
743
  return stateChanged;
170,208,757✔
744
}
745

746
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
35,892,304✔
747
  bool stateChanged = false;
35,892,304✔
748
  bool roleChanged = pObj->syncState != pMload->syncState ||
35,911,900✔
749
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
71,252,101✔
750
                     pObj->roleTimeMs != pMload->roleTimeMs;
35,359,797✔
751
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
35,892,304✔
752
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
538,805✔
753
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
754
          pObj->syncTerm, pMload->syncTerm);
755
    pObj->syncState = pMload->syncState;
538,805✔
756
    pObj->syncTerm = pMload->syncTerm;
538,805✔
757
    pObj->syncRestore = pMload->syncRestore;
538,805✔
758
    pObj->roleTimeMs = pMload->roleTimeMs;
538,805✔
759
    stateChanged = true;
538,805✔
760
  }
761
  return stateChanged;
35,892,304✔
762
}
763

764
extern char   *tsMonFwUri;
765
extern char   *tsMonSlowLogUri;
766
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
320✔
767
  SMnode    *pMnode = pReq->info.node;
320✔
768
  SStatisReq statisReq = {0};
320✔
769
  int32_t    code = -1;
320✔
770

771
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
320✔
772

773
  if (tsMonitorLogProtocol) {
320✔
UNCOV
774
    mInfo("process statis req,\n %s", statisReq.pCont);
×
775
  }
776

777
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
320✔
778
    monSendContent(statisReq.pCont, tsMonFwUri);
320✔
779
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
780
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
781
  }
782

783
  tFreeSStatisReq(&statisReq);
320✔
784
  return 0;
320✔
785
}
786

787
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
2,543✔
788
  mTrace("process audit req:%p", pReq);
2,543✔
789
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
2,543✔
790
    SMnode   *pMnode = pReq->info.node;
2,543✔
791
    SAuditReq auditReq = {0};
2,543✔
792

793
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
2,543✔
794

795
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
2,543✔
796

797
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
2,543✔
798
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
799

800
    tFreeSAuditReq(&auditReq);
2,543✔
801
  }
802
  return 0;
2,543✔
803
}
804

805
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
806
  mTrace("process audit req:%p", pReq);
×
807
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
808
    SMnode        *pMnode = pReq->info.node;
×
809
    SBatchAuditReq auditReq = {0};
×
810

811
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
812

813
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
814

815
    for (int32_t i = 0; i < nAudit; ++i) {
×
816
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
817
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
818

819
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
820
                     audit->duration, audit->affectedRows);
821
    }
822

823
    tFreeSBatchAuditReq(&auditReq);
×
824
  }
825
  return 0;
×
826
}
827

828
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
802,732✔
829
  int32_t       code = 0, lino = 0;
802,732✔
830
  SDnodeInfoReq infoReq = {0};
802,732✔
831
  int32_t       contLen = 0;
802,732✔
832
  void         *pReq = NULL;
802,732✔
833

834
  infoReq.dnodeId = pDnode->id;
802,732✔
835
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
802,732✔
836

837
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
802,732✔
838
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
839
  }
840
  pReq = rpcMallocCont(contLen);
802,732✔
841
  if (pReq == NULL) {
802,732✔
842
    TAOS_RETURN(terrno);
×
843
  }
844

845
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
802,732✔
846
    code = contLen;
×
847
    goto _exit;
×
848
  }
849

850
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
802,732✔
851
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
802,732✔
852
_exit:
802,732✔
853
  if (code < 0) {
802,732✔
854
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
855
  }
856
  TAOS_RETURN(code);
802,732✔
857
}
858

859
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
802,498✔
860
  int32_t       code = 0, lino = 0;
802,498✔
861
  SMnode       *pMnode = pReq->info.node;
802,498✔
862
  SDnodeInfoReq infoReq = {0};
802,498✔
863
  SDnodeObj    *pDnode = NULL;
802,498✔
864
  STrans       *pTrans = NULL;
802,498✔
865
  SSdbRaw      *pCommitRaw = NULL;
802,498✔
866

867
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
802,498✔
868

869
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
802,498✔
870
  if (pDnode == NULL) {
802,498✔
871
    TAOS_CHECK_EXIT(terrno);
×
872
  }
873

874
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
802,498✔
875
  if (pTrans == NULL) {
802,498✔
876
    TAOS_CHECK_EXIT(terrno);
×
877
  }
878

879
  pDnode->updateTime = taosGetTimestampMs();
802,498✔
880

881
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
802,498✔
882
    TAOS_CHECK_EXIT(terrno);
×
883
  }
884
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
802,498✔
885
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
886
    sdbFreeRaw(pCommitRaw);
×
887
    TAOS_CHECK_EXIT(code);
×
888
  }
889
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
802,498✔
890

891
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
802,498✔
UNCOV
892
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
893
    TAOS_CHECK_EXIT(code);
×
894
  }
895

896
_exit:
802,498✔
897
  mndReleaseDnode(pMnode, pDnode);
802,498✔
898
  if (code != 0) {
802,498✔
UNCOV
899
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
×
900
  }
901
  mndTransDrop(pTrans);
802,498✔
902
  TAOS_RETURN(code);
802,498✔
903
}
904

905
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
57,220,567✔
906
  SMnode    *pMnode = pReq->info.node;
57,220,567✔
907
  SStatusReq statusReq = {0};
57,220,567✔
908
  SDnodeObj *pDnode = NULL;
57,220,567✔
909
  int32_t    code = -1;
57,220,567✔
910
  int32_t    lino = 0;
57,220,567✔
911

912
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), &lino, _OVER);
57,220,567✔
913

914
  int64_t clusterid = mndGetClusterId(pMnode);
57,220,567✔
915
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
57,220,567✔
916
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
917
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
918
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
919
    TAOS_CHECK_GOTO(code, &lino, _OVER);
×
920
  }
921

922
  if (statusReq.dnodeId == 0) {
57,220,567✔
923
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
1,264,511✔
924
    if (pDnode == NULL) {
1,264,511✔
925
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
556,486✔
926
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
556,486✔
927
      if (terrno != 0) code = terrno;
556,486✔
928
      TAOS_CHECK_GOTO(code, &lino, _OVER);
556,486✔
929
    }
930
  } else {
931
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
55,956,056✔
932
    if (pDnode == NULL) {
55,956,056✔
933
      int32_t err = terrno;
176,245✔
934
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
176,245✔
935
      if (pDnode != NULL) {
176,245✔
936
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
95✔
937
        code = err;
95✔
938
        TAOS_CHECK_GOTO(code, &lino, _OVER);
95✔
939
      }
940

941
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
176,150✔
942
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
176,150✔
943
        code = err;
8,832✔
944
        TAOS_CHECK_GOTO(code, &lino, _OVER);
8,832✔
945
      } else {
946
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
167,318✔
947
        if (pDnode == NULL) goto _OVER;
167,318✔
948
      }
949
    }
950
  }
951

952
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
56,655,154✔
953
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
56,655,154✔
954

955
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
56,655,154✔
956
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
56,655,154✔
957
  int64_t curMs = taosGetTimestampMs();
56,655,154✔
958
  bool    online = mndIsDnodeOnline(pDnode, curMs);
56,655,154✔
959
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
56,655,154✔
960
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
56,655,154✔
961
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
56,655,154✔
962
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
56,655,154✔
963
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
56,655,154✔
964
  bool    analVerChanged = (analVer != statusReq.analVer);
56,655,154✔
965
  bool    auditDBChanged = false;
56,655,154✔
966
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
56,655,154✔
967
  bool    auditInfoChanged = false;
56,655,154✔
968
  char    auditToken[TSDB_TOKEN_LEN] = {0};
56,655,154✔
969

970
  SDbObj *pDb = NULL;
56,655,154✔
971
  if (tsAuditUseToken || tsAuditSaveInSelf) {
56,655,154✔
972
    pDb = mndAcquireAuditDb(pMnode);
56,653,080✔
973
  }
974
  if (tsAuditUseToken) {
56,655,154✔
975
    if (pDb != NULL) {
56,650,236✔
976
      SName name = {0};
1,296✔
977
      if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0)
1,296✔
978
        mError("db:%s, failed to parse db name", pDb->name);
×
979
      tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN);
1,296✔
980
    }
981
    if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
56,650,236✔
982

983
    char    auditUser[TSDB_USER_LEN] = {0};
56,650,236✔
984
    int32_t ret = 0;
56,650,236✔
985
    if ((ret = mndGetAuditUser(pMnode, auditUser)) != 0) {
56,650,236✔
986
      mTrace("dnode:%d, failed to get audit user since %s", pDnode->id, tstrerror(ret));
56,649,183✔
987
    } else {
988
      mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser);
1,053✔
989
      int32_t ret = 0;
1,053✔
990
      if ((ret = mndGetUserActiveToken(auditUser, auditToken)) != 0) {
1,053✔
991
        mTrace("dnode:%d, failed to get audit user active token, token:xxxx, since %s", pDnode->id, tstrerror(ret));
×
992
      } else {
993
        mTrace("dnode:%d, get audit user active token:xxxx", pDnode->id);
1,053✔
994
        if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditInfoChanged = true;
1,053✔
995
      }
996
    }
997
  }
998

999
  SEpSet  auditVnodeEpSet = {0};
56,655,154✔
1000
  int32_t auditVgId = 0;
56,655,154✔
1001
  if (tsAuditSaveInSelf) {
56,655,154✔
1002
    if (pDb != NULL) {
2,844✔
1003
      void   *pIter = NULL;
2,449✔
1004
      SVgObj *pVgroup = NULL;
2,449✔
1005
      while (1) {
1006
        pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
7,347✔
1007
        if (pIter == NULL) break;
7,347✔
1008

1009
        if (mndVgroupInDb(pVgroup, pDb->uid)) {
7,268✔
1010
          auditVnodeEpSet = mndGetVgroupEpset(pMnode, pVgroup);
2,370✔
1011
          auditVgId = pVgroup->vgId;
2,370✔
1012
          sdbCancelFetch(pMnode->pSdb, pIter);
2,370✔
1013
          sdbRelease(pMnode->pSdb, pVgroup);
2,370✔
1014
          break;
2,370✔
1015
        }
1016
        sdbRelease(pMnode->pSdb, pVgroup);
4,898✔
1017
      }
1018
    }
1019

1020
    if (auditVnodeEpSet.numOfEps != statusReq.auditEpSet.numOfEps) {
2,844✔
1021
      auditInfoChanged = true;
158✔
1022
      mTrace("dnode:%d, audit epset num changed, auditNum:%d, inReq:%d", pDnode->id, auditVnodeEpSet.numOfEps,
158✔
1023
             statusReq.auditEpSet.numOfEps);
1024
    } else {
1025
      for (int32_t i = 0; i < auditVnodeEpSet.numOfEps; i++) {
4,898✔
1026
        if (strncmp(auditVnodeEpSet.eps[i].fqdn, statusReq.auditEpSet.eps[i].fqdn, TSDB_FQDN_LEN) != 0 ||
2,212✔
1027
            auditVnodeEpSet.eps[i].port != statusReq.auditEpSet.eps[i].port) {
2,212✔
1028
          // do not need to check InUse here, because inUse is not accurate at every time
1029
          auditInfoChanged = true;
×
1030
          mTrace("dnode:%d, audit epset changed at item:%d, fqdn:%s:%d:, inReq:%s:%d", pDnode->id, i,
×
1031
                 auditVnodeEpSet.eps[i].fqdn, auditVnodeEpSet.eps[i].port, statusReq.auditEpSet.eps[i].fqdn,
1032
                 statusReq.auditEpSet.eps[i].port);
1033
          break;
×
1034
        }
1035
      }
1036
    }
1037

1038
    if (auditVgId != statusReq.auditVgId) {
2,844✔
1039
      auditInfoChanged = true;
158✔
1040
      mTrace("dnode:%d, audit vgId changed, auditVgId:%d, inReq:%d", pDnode->id, auditVgId, statusReq.auditVgId);
158✔
1041
    }
1042
  }
1043

1044
  if (pDb != NULL) {
56,655,154✔
1045
    mndReleaseDb(pMnode, pDb);
3,745✔
1046
  }
1047

1048
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
55,888,937✔
1049
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
54,256,976✔
1050
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditInfoChanged;
112,544,091✔
1051
  const STraceId *trace = &pReq->info.traceId;
56,655,154✔
1052
  char            timestamp[TD_TIME_STR_LEN] = {0};
56,655,154✔
1053
  if (mDebugFlag & DEBUG_TRACE)
56,655,154✔
1054
    (void)formatTimestampLocal(timestamp, sizeof(timestamp), statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
186,903✔
1055
  mGTrace(
56,655,154✔
1056
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
1057
      "timestamp:%s",
1058
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
1059

1060
  if (reboot) {
56,655,154✔
1061
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
784,233✔
1062
  }
1063

1064
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
56,655,154✔
1065
  if (labs(delta) >= tsTimestampDeltaLimit) {
56,655,154✔
1066
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
1067
    code = terrno;
×
1068

1069
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
1070
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
1071
           tstrerror(code), tsTimestampDeltaLimit);
1072
    TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1073
  }
1074
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
227,575,876✔
1075
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
170,920,722✔
1076

1077
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
170,920,722✔
1078
    if (pVgroup != NULL) {
170,920,722✔
1079
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
170,278,496✔
1080
        pVgroup->cacheUsage = pVload->cacheUsage;
136,941,062✔
1081
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
136,941,062✔
1082
        pVgroup->numOfTables = pVload->numOfTables;
136,941,062✔
1083
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
136,941,062✔
1084
        pVgroup->totalStorage = pVload->totalStorage;
136,941,062✔
1085
        pVgroup->compStorage = pVload->compStorage;
136,941,062✔
1086
        pVgroup->pointsWritten = pVload->pointsWritten;
136,941,062✔
1087
      }
1088
      bool stateChanged = false;
170,278,496✔
1089
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
216,874,765✔
1090
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
216,805,026✔
1091
        if (pGid->dnodeId == statusReq.dnodeId) {
216,805,026✔
1092
          if (pVload->startTimeMs == 0) {
170,208,757✔
1093
            pVload->startTimeMs = statusReq.rebootTime;
×
1094
          }
1095
          if (pVload->roleTimeMs == 0) {
170,208,757✔
1096
            pVload->roleTimeMs = statusReq.rebootTime;
×
1097
          }
1098
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
170,208,757✔
1099
          break;
170,208,757✔
1100
        }
1101
      }
1102
      if (stateChanged) {
170,278,496✔
1103
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
6,331,044✔
1104
        if (pDb != NULL && pDb->stateTs != curMs) {
6,331,044✔
1105
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
4,388,303✔
1106
                pDb->stateTs, curMs);
1107
          pDb->stateTs = curMs;
4,388,303✔
1108
        }
1109
        mndReleaseDb(pMnode, pDb);
6,331,044✔
1110
      }
1111
    }
1112

1113
    mndReleaseVgroup(pMnode, pVgroup);
170,920,722✔
1114
  }
1115

1116
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
56,655,154✔
1117
  if (pObj != NULL) {
56,655,154✔
1118
    if (statusReq.mload.roleTimeMs == 0) {
35,892,304✔
1119
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
288,169✔
1120
    }
1121
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
35,892,304✔
1122
    mndReleaseMnode(pMnode, pObj);
35,892,304✔
1123
  }
1124

1125
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
56,655,154✔
1126
  if (pQnode != NULL) {
56,655,154✔
1127
    pQnode->load = statusReq.qload;
356,211✔
1128
    mndReleaseQnode(pMnode, pQnode);
356,211✔
1129
  }
1130

1131
  if (needCheck) {
56,655,154✔
1132
    if (statusReq.sver != tsVersion) {
2,398,770✔
1133
      if (pDnode != NULL) {
×
1134
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
1135
      }
1136
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
1137
      code = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
1138
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1139
    }
1140

1141
    if (statusReq.dnodeId == 0) {
2,398,770✔
1142
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
708,025✔
1143
    } else {
1144
      if (statusReq.clusterId != pMnode->clusterId) {
1,690,745✔
1145
        if (pDnode != NULL) {
×
1146
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
1147
        }
1148
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
1149
               pMnode->clusterId);
1150
        code = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
1151
        TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1152
      }
1153
    }
1154

1155
    // Verify whether the cluster parameters are consistent when status change from offline to ready
1156
    // pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
1157
    // if (pDnode->offlineReason != 0) {
1158
    //   mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
1159
    //   if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
1160
    //   goto _OVER;
1161
    // }
1162

1163
    if (!online) {
2,398,770✔
1164
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
766,217✔
1165
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
1166
    } else {
1167
      mInfo("dnode:%d, do check in status req, online:%d dnodeVer:%" PRId64 ":%" PRId64
1,632,553✔
1168
            " reboot:%d, dnodeChanged:%d supportVnodesChanged:%d analVerChanged:%d encryptKeyChanged:%d "
1169
            "enableWhiteListChanged:%d auditDBChanged:%d auditInfoChanged:%d pMnode->ipWhiteVer:%" PRId64
1170
            " statusReq.ipWhiteVer:%" PRId64 " pMnode->timeWhiteVer:%" PRId64 " statusReq.timeWhiteVer:%" PRId64,
1171
            pDnode->id, online, statusReq.dnodeVer, dnodeVer, reboot, dnodeChanged, supportVnodesChanged,
1172
            analVerChanged, encryptKeyChanged, enableWhiteListChanged, auditDBChanged, auditInfoChanged,
1173
            pMnode->ipWhiteVer, statusReq.ipWhiteVer, pMnode->timeWhiteVer, statusReq.timeWhiteVer);
1174
    }
1175

1176
    pDnode->rebootTime = statusReq.rebootTime;
2,398,770✔
1177
    pDnode->numOfCores = statusReq.numOfCores;
2,398,770✔
1178
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,398,770✔
1179
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,398,770✔
1180
    pDnode->memAvail = statusReq.memAvail;
2,398,770✔
1181
    pDnode->memTotal = statusReq.memTotal;
2,398,770✔
1182
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,398,770✔
1183
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,398,770✔
1184
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,398,770✔
1185
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
802,732✔
1186
      if ((code = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
802,732✔
1187
        TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1188
      }
1189
    }
1190

1191
    SStatusRsp statusRsp = {0};
2,398,770✔
1192
    statusRsp.statusSeq++;
2,398,770✔
1193
    statusRsp.analVer = analVer;
2,398,770✔
1194
    statusRsp.dnodeVer = dnodeVer;
2,398,770✔
1195
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,398,770✔
1196
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,398,770✔
1197
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,398,770✔
1198
    if (statusRsp.pDnodeEps == NULL) {
2,398,770✔
1199
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1200
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1201
    }
1202

1203
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,398,770✔
1204
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,398,770✔
1205
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,398,770✔
1206

1207
    if (auditInfoChanged || auditDBChanged) {
2,398,770✔
1208
      if (tsAuditUseToken) {
320✔
1209
        if (auditDB[0] != '\0') {
162✔
1210
          mInfo("dnode:%d, set audit db:%s in process status rsp", statusReq.dnodeId, auditDB);
162✔
1211
          tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
162✔
1212
        }
1213
        if (auditToken[0] != '\0') {
162✔
1214
          mInfo("dnode:%d, set audit token:xxxx in process status rsp", statusReq.dnodeId);
81✔
1215
          tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN);
81✔
1216
        }
1217
      }
1218

1219
      if (tsAuditSaveInSelf) {
320✔
1220
        mInfo("dnode:%d, set audit epset and vgId:%d in process status rsp", statusReq.dnodeId, auditVgId);
158✔
1221
        statusRsp.auditEpSet = auditVnodeEpSet;
158✔
1222
        statusRsp.auditVgId = auditVgId;
158✔
1223
      }
1224
    }
1225

1226
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,398,770✔
1227
    void   *pHead = rpcMallocCont(contLen);
2,398,770✔
1228
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,398,770✔
1229
    taosArrayDestroy(statusRsp.pDnodeEps);
2,398,770✔
1230
    if (contLen < 0) {
2,398,770✔
1231
      code = contLen;
×
1232
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1233
    }
1234

1235
    pReq->info.rspLen = contLen;
2,398,770✔
1236
    pReq->info.rsp = pHead;
2,398,770✔
1237
  }
1238

1239
  pDnode->accessTimes++;
56,655,154✔
1240
  pDnode->lastAccessTime = curMs;
56,655,154✔
1241
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
56,655,154✔
1242
    pDnode->offlineReason = DND_REASON_ONLINE;
766,217✔
1243
  }
1244
  code = 0;
56,655,154✔
1245

1246
_OVER:
57,220,567✔
1247
  mndReleaseDnode(pMnode, pDnode);
57,220,567✔
1248
  taosArrayDestroy(statusReq.pVloads);
57,220,567✔
1249
  if (code != 0) {
57,220,567✔
1250
    mError("dnode:%d, failed to process status req at line:%d since %s", statusReq.dnodeId, lino, tstrerror(code));
565,413✔
1251
    return code;
565,413✔
1252
  }
1253

1254
  return mndUpdClusterInfo(pReq);
56,655,154✔
1255
}
1256

1257
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
1258
  SMnode    *pMnode = pReq->info.node;
×
1259
  SNotifyReq notifyReq = {0};
×
1260
  int32_t    code = 0;
×
1261

1262
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
1263
    terrno = code;
×
1264
    goto _OVER;
×
1265
  }
1266

1267
  int64_t clusterid = mndGetClusterId(pMnode);
×
1268
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
1269
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
1270
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1271
          notifyReq.clusterId, clusterid, tstrerror(code));
1272
    goto _OVER;
×
1273
  }
1274

1275
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
1276
  for (int32_t v = 0; v < nVgroup; ++v) {
×
1277
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1278

1279
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
1280
    if (pVgroup != NULL) {
×
1281
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
1282
      mndReleaseVgroup(pMnode, pVgroup);
×
1283
    }
1284
  }
1285
  code = mndUpdClusterInfo(pReq);
×
1286
_OVER:
×
1287
  tFreeSNotifyReq(&notifyReq);
×
1288
  return code;
×
1289
}
1290

1291
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
176,861✔
1292
  int32_t  code = -1;
176,861✔
1293
  SSdbRaw *pRaw = NULL;
176,861✔
1294
  STrans  *pTrans = NULL;
176,861✔
1295

1296
  SDnodeObj dnodeObj = {0};
176,861✔
1297
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
176,861✔
1298
  dnodeObj.createdTime = taosGetTimestampMs();
176,861✔
1299
  dnodeObj.updateTime = dnodeObj.createdTime;
176,861✔
1300
  dnodeObj.port = pCreate->port;
176,861✔
1301
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
176,861✔
1302
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
176,861✔
1303

1304
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
176,861✔
1305
  if (pTrans == NULL) {
176,861✔
1306
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1307
    if (terrno != 0) code = terrno;
×
1308
    goto _OVER;
×
1309
  }
1310
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
176,861✔
1311
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
176,861✔
1312

1313
  pRaw = mndDnodeActionEncode(&dnodeObj);
176,861✔
1314
  if (pRaw == NULL) {
176,861✔
1315
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1316
    if (terrno != 0) code = terrno;
×
1317
    goto _OVER;
×
1318
  }
1319
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
176,861✔
1320
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
176,861✔
1321
  pRaw = NULL;
176,861✔
1322

1323
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
176,861✔
1324
  code = 0;
176,861✔
1325

1326
_OVER:
176,861✔
1327
  mndTransDrop(pTrans);
176,861✔
1328
  sdbFreeRaw(pRaw);
176,861✔
1329
  return code;
176,861✔
1330
}
1331

1332
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
36,148✔
1333
  SMnode       *pMnode = pReq->info.node;
36,148✔
1334
  SSdb         *pSdb = pMnode->pSdb;
36,148✔
1335
  SDnodeObj    *pObj = NULL;
36,148✔
1336
  void         *pIter = NULL;
36,148✔
1337
  SDnodeListRsp rsp = {0};
36,148✔
1338
  int32_t       code = -1;
36,148✔
1339

1340
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
36,148✔
1341
  if (NULL == rsp.dnodeList) {
36,148✔
1342
    mError("failed to alloc epSet while process dnode list req");
×
1343
    code = terrno;
×
1344
    goto _OVER;
×
1345
  }
1346

1347
  while (1) {
66,153✔
1348
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
102,301✔
1349
    if (pIter == NULL) break;
102,301✔
1350

1351
    SDNodeAddr dnodeAddr = {0};
66,153✔
1352
    dnodeAddr.nodeId = pObj->id;
66,153✔
1353
    dnodeAddr.epSet.numOfEps = 1;
66,153✔
1354
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
66,153✔
1355
    dnodeAddr.epSet.eps[0].port = pObj->port;
66,153✔
1356

1357
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
132,306✔
1358
      if (terrno != 0) code = terrno;
×
1359
      sdbRelease(pSdb, pObj);
×
1360
      sdbCancelFetch(pSdb, pIter);
×
1361
      goto _OVER;
×
1362
    }
1363

1364
    sdbRelease(pSdb, pObj);
66,153✔
1365
  }
1366

1367
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
36,148✔
1368
  void   *pRsp = rpcMallocCont(rspLen);
36,148✔
1369
  if (pRsp == NULL) {
36,148✔
1370
    code = terrno;
×
1371
    goto _OVER;
×
1372
  }
1373

1374
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
36,148✔
1375
    code = rspLen;
×
1376
    goto _OVER;
×
1377
  }
1378

1379
  pReq->info.rspLen = rspLen;
36,148✔
1380
  pReq->info.rsp = pRsp;
36,148✔
1381
  code = 0;
36,148✔
1382

1383
_OVER:
36,148✔
1384

1385
  if (code != 0) {
36,148✔
1386
    mError("failed to get dnode list since %s", tstrerror(code));
×
1387
  }
1388

1389
  tFreeSDnodeListRsp(&rsp);
36,148✔
1390

1391
  TAOS_RETURN(code);
36,148✔
1392
}
1393

1394
void getSlowLogScopeString(int32_t scope, char *result) {
1,393✔
1395
  if (scope == SLOW_LOG_TYPE_NULL) {
1,393✔
1396
    (void)strncat(result, "NONE", 64);
×
1397
    return;
×
1398
  }
1399
  while (scope > 0) {
2,786✔
1400
    if (scope & SLOW_LOG_TYPE_QUERY) {
1,393✔
1401
      (void)strncat(result, "QUERY", 64);
1,393✔
1402
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,393✔
1403
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1404
      (void)strncat(result, "INSERT", 64);
×
1405
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1406
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1407
      (void)strncat(result, "OTHERS", 64);
×
1408
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1409
    } else {
1410
      (void)printf("invalid slow log scope:%d", scope);
×
1411
      return;
×
1412
    }
1413

1414
    if (scope > 0) {
1,393✔
1415
      (void)strncat(result, "|", 64);
×
1416
    }
1417
  }
1418
}
1419

1420
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
176,861✔
1421
  SMnode         *pMnode = pReq->info.node;
176,861✔
1422
  int32_t         code = -1;
176,861✔
1423
  SDnodeObj      *pDnode = NULL;
176,861✔
1424
  SCreateDnodeReq createReq = {0};
176,861✔
1425
  int32_t         lino = 0;
176,861✔
1426
  int64_t         tss = taosGetTimestampMs();
176,861✔
1427

1428
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
176,861✔
1429
    goto _OVER;
×
1430
  }
1431

1432
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
176,861✔
1433
  TAOS_CHECK_GOTO(code, &lino, _OVER);
176,861✔
1434

1435
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
176,861✔
1436
  code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_DNODE);
176,861✔
1437
  TAOS_CHECK_GOTO(code, &lino, _OVER);
176,861✔
1438

1439
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
176,861✔
1440
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
1441
    goto _OVER;
×
1442
  }
1443
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1444
  // if (code != 0) {
1445
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1446
  //          tstrerror(code));
1447
  //   goto _OVER;
1448
  // }
1449

1450
  char ep[TSDB_EP_LEN];
176,861✔
1451
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
176,861✔
1452
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
176,861✔
1453
  if (pDnode != NULL) {
176,861✔
1454
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
1455
    goto _OVER;
×
1456
  }
1457

1458
  code = mndCreateDnode(pMnode, pReq, &createReq);
176,861✔
1459
  if (code == 0) {
176,861✔
1460
    code = TSDB_CODE_ACTION_IN_PROGRESS;
176,861✔
1461
    tsGrantHBInterval = 5;
176,861✔
1462
  }
1463

1464
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
176,861✔
1465
    char obj[200] = {0};
176,861✔
1466
    (void)snprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
176,861✔
1467

1468
    int64_t tse = taosGetTimestampMs();
176,861✔
1469
    double  duration = (double)(tse - tss);
176,861✔
1470
    duration = duration / 1000;
176,861✔
1471
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
176,861✔
1472
  }
1473

1474
_OVER:
176,861✔
1475
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
176,861✔
1476
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1477
  }
1478

1479
  mndReleaseDnode(pMnode, pDnode);
176,861✔
1480
  tFreeSCreateDnodeReq(&createReq);
176,861✔
1481
  TAOS_RETURN(code);
176,861✔
1482
}
1483

1484
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1485

1486
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
2,654✔
1487

1488
#ifndef TD_ENTERPRISE
1489
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1490
#endif
1491

1492
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
11,231✔
1493
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1494
  int32_t  code = -1;
11,231✔
1495
  SSdbRaw *pRaw = NULL;
11,231✔
1496
  STrans  *pTrans = NULL;
11,231✔
1497
  int32_t  lino = 0;
11,231✔
1498

1499
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
11,231✔
1500
  if (pTrans == NULL) {
11,231✔
1501
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1502
    if (terrno != 0) code = terrno;
×
1503
    goto _OVER;
×
1504
  }
1505
  mndTransSetGroupParallel(pTrans);
11,231✔
1506
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
11,231✔
1507
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
11,231✔
1508

1509
  pRaw = mndDnodeActionEncode(pDnode);
11,231✔
1510
  if (pRaw == NULL) {
11,231✔
1511
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1512
    if (terrno != 0) code = terrno;
×
1513
    goto _OVER;
×
1514
  }
1515
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
11,231✔
1516
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
11,231✔
1517
  pRaw = NULL;
11,231✔
1518

1519
  pRaw = mndDnodeActionEncode(pDnode);
11,231✔
1520
  if (pRaw == NULL) {
11,231✔
1521
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1522
    if (terrno != 0) code = terrno;
×
1523
    goto _OVER;
×
1524
  }
1525
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
11,231✔
1526
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
11,231✔
1527
  pRaw = NULL;
11,231✔
1528

1529
  if (pSObj != NULL) {
11,231✔
1530
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
673✔
1531
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
673✔
1532
  }
1533

1534
  if (pMObj != NULL) {
11,231✔
1535
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
281✔
1536
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
281✔
1537
  }
1538

1539
  if (pQObj != NULL) {
11,231✔
1540
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
181✔
1541
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
181✔
1542
  }
1543

1544
  if (pBObj != NULL) {
11,231✔
1545
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
931✔
1546
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
931✔
1547
  }
1548

1549
  if (numOfVnodes > 0) {
10,300✔
1550
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
8,185✔
1551
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
8,185✔
1552
  }
1553

1554
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
10,300✔
1555

1556
  code = 0;
10,300✔
1557

1558
_OVER:
11,231✔
1559
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
11,231✔
1560
  mndTransDrop(pTrans);
11,231✔
1561
  sdbFreeRaw(pRaw);
11,231✔
1562
  TAOS_RETURN(code);
11,231✔
1563
}
1564

1565
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1566
  bool       isEmpty = false;
×
1567
  SMnodeObj *pMObj = NULL;
×
1568
  SQnodeObj *pQObj = NULL;
×
1569
  SSnodeObj *pSObj = NULL;
×
1570

1571
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1572
  if (pQObj) goto _OVER;
×
1573

1574
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
1575
  if (pSObj) goto _OVER;
×
1576

1577
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
1578
  if (pMObj) goto _OVER;
×
1579

1580
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
1581
  if (numOfVnodes > 0) goto _OVER;
×
1582

1583
  isEmpty = true;
×
1584
_OVER:
×
1585
  mndReleaseMnode(pMnode, pMObj);
×
1586
  mndReleaseQnode(pMnode, pQObj);
×
1587
  mndReleaseSnode(pMnode, pSObj);
×
1588
  return isEmpty;
×
1589
}
1590

1591
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
12,344✔
1592
  SMnode       *pMnode = pReq->info.node;
12,344✔
1593
  int32_t       code = -1;
12,344✔
1594
  SDnodeObj    *pDnode = NULL;
12,344✔
1595
  SMnodeObj    *pMObj = NULL;
12,344✔
1596
  SQnodeObj    *pQObj = NULL;
12,344✔
1597
  SSnodeObj    *pSObj = NULL;
12,344✔
1598
  SBnodeObj    *pBObj = NULL;
12,344✔
1599
  SDropDnodeReq dropReq = {0};
12,344✔
1600
  int64_t       tss = taosGetTimestampMs();
12,344✔
1601

1602
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
12,344✔
1603

1604
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
12,344✔
1605
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1606
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MNODE), NULL, _OVER);
12,344✔
1607

1608
  bool force = dropReq.force;
12,344✔
1609
  if (dropReq.unsafe) {
12,344✔
1610
    force = true;
×
1611
  }
1612

1613
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
12,344✔
1614
  if (pDnode == NULL) {
12,344✔
1615
    int32_t err = terrno;
×
1616
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1617
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1618
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1619
    if (pDnode == NULL) {
×
1620
      code = err;
×
1621
      goto _OVER;
×
1622
    }
1623
  }
1624

1625
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
12,344✔
1626
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
12,344✔
1627
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
12,344✔
1628
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
12,344✔
1629
  if (pMObj != NULL) {
12,344✔
1630
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
1,088✔
1631
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
526✔
1632
      goto _OVER;
526✔
1633
    }
1634
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
562✔
1635
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
281✔
1636
      goto _OVER;
281✔
1637
    }
1638
  }
1639

1640
#ifdef USE_MOUNT
1641
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
11,537✔
1642
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1643
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1644
    goto _OVER;
×
1645
  }
1646
#endif
1647

1648
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
11,537✔
1649
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
11,537✔
1650

1651
  if (isonline && force) {
11,537✔
1652
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
1653
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1654
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
1655
    goto _OVER;
×
1656
  }
1657

1658
  mError("vnode num:%d", numOfVnodes);
11,537✔
1659

1660
  bool    vnodeOffline = false;
11,537✔
1661
  void   *pIter = NULL;
11,537✔
1662
  int32_t vgId = -1;
11,537✔
1663
  while (1) {
24,360✔
1664
    SVgObj *pVgroup = NULL;
35,897✔
1665
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
35,897✔
1666
    if (pIter == NULL) break;
35,897✔
1667

1668
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
73,512✔
1669
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
49,152✔
1670
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
49,152✔
1671
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
16,374✔
1672
          vgId = pVgroup->vgId;
×
1673
          vnodeOffline = true;
×
1674
          break;
×
1675
        }
1676
      }
1677
    }
1678

1679
    sdbRelease(pMnode->pSdb, pVgroup);
24,360✔
1680

1681
    if (vnodeOffline) {
24,360✔
1682
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1683
      break;
×
1684
    }
1685
  }
1686

1687
  if (vnodeOffline && !force) {
11,537✔
1688
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
1689
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1690
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1691
    goto _OVER;
×
1692
  }
1693

1694
  if (!isonline && !force) {
11,537✔
1695
    code = TSDB_CODE_DNODE_OFFLINE;
306✔
1696
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
306✔
1697
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1698
    goto _OVER;
306✔
1699
  }
1700

1701
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
11,231✔
1702
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
11,231✔
1703

1704
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
11,231✔
1705
    char obj1[30] = {0};
11,231✔
1706
    (void)snprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
11,231✔
1707

1708
    int64_t tse = taosGetTimestampMs();
11,231✔
1709
    double  duration = (double)(tse - tss);
11,231✔
1710
    duration = duration / 1000;
11,231✔
1711
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
11,231✔
1712
  }
1713

1714
_OVER:
12,344✔
1715
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
12,344✔
1716
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
2,044✔
1717
  }
1718

1719
  mndReleaseDnode(pMnode, pDnode);
12,344✔
1720
  mndReleaseMnode(pMnode, pMObj);
12,344✔
1721
  mndReleaseQnode(pMnode, pQObj);
12,344✔
1722
  mndReleaseBnode(pMnode, pBObj);
12,344✔
1723
  mndReleaseSnode(pMnode, pSObj);
12,344✔
1724
  tFreeSDropDnodeReq(&dropReq);
12,344✔
1725
  TAOS_RETURN(code);
12,344✔
1726
}
1727

1728
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
200✔
1729
  int32_t code = 0;
200✔
1730
  SMnode *pMnode = pReq->info.node;
200✔
1731
  SSdb   *pSdb = pMnode->pSdb;
200✔
1732
  void   *pIter = NULL;
200✔
1733
  int8_t  encrypting = 0;
200✔
1734

1735
  const STraceId *trace = &pReq->info.traceId;
200✔
1736

1737
  int32_t klen = strlen(pDcfgReq->value);
200✔
1738
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
200✔
1739
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
1740
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1741
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
1742
    goto _exit;
×
1743
  }
1744

1745
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
200✔
1746
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1747
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
1748
    goto _exit;
×
1749
  }
1750

1751
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
200✔
1752
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1753
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1754
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1755
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
1756
    goto _exit;
×
1757
  }
1758

1759
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
200✔
1760
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
200✔
1761
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
200✔
1762

1763
  while (1) {
200✔
1764
    SDnodeObj *pDnode = NULL;
400✔
1765
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
400✔
1766
    if (pIter == NULL) break;
400✔
1767
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
200✔
1768
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1769
             offlineReason[pDnode->offlineReason]);
1770
      sdbRelease(pSdb, pDnode);
×
1771
      continue;
×
1772
    }
1773

1774
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
200✔
1775
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
200✔
1776
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
200✔
1777
      void   *pBuf = rpcMallocCont(bufLen);
200✔
1778

1779
      if (pBuf != NULL) {
200✔
1780
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
200✔
1781
          code = bufLen;
×
1782
          sdbRelease(pSdb, pDnode);
×
1783
          goto _exit;
×
1784
        }
1785
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
200✔
1786
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
200✔
1787
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
200✔
1788
        }
1789
      }
1790
    }
1791

1792
    sdbRelease(pSdb, pDnode);
200✔
1793
  }
1794

1795
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
200✔
1796
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1797
  }
1798

1799
_exit:
200✔
1800
  if (code != 0) {
200✔
1801
    if (terrno == 0) terrno = code;
×
1802
  }
1803
  return code;
200✔
1804
}
1805

1806
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
200✔
1807
  int32_t code = 0;
200✔
1808

1809
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1810
  SMnode       *pMnode = pReq->info.node;
200✔
1811
  SMCfgDnodeReq cfgReq = {0};
200✔
1812
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
200✔
1813

1814
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE)) != 0) {
200✔
1815
    tFreeSMCfgDnodeReq(&cfgReq);
×
1816
    TAOS_RETURN(code);
×
1817
  }
1818
  const STraceId *trace = &pReq->info.traceId;
200✔
1819
  SDCfgDnodeReq   dcfgReq = {0};
200✔
1820
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
200✔
1821
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
200✔
1822
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
200✔
1823
    tFreeSMCfgDnodeReq(&cfgReq);
200✔
1824
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
200✔
1825
  } else {
1826
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
1827
    tFreeSMCfgDnodeReq(&cfgReq);
×
1828
    TAOS_RETURN(code);
×
1829
  }
1830

1831
#else
1832
  TAOS_RETURN(code);
1833
#endif
1834
}
1835

1836
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
200✔
1837
  SMnode *pMnode = pRsp->info.node;
200✔
1838
  int16_t nSuccess = 0;
200✔
1839
  int16_t nFailed = 0;
200✔
1840

1841
  if (0 == pRsp->code) {
200✔
1842
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
200✔
1843
  } else {
1844
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1845
  }
1846

1847
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
200✔
1848
  bool    finished = nSuccess + nFailed >= nReq;
200✔
1849

1850
  if (finished) {
200✔
1851
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
200✔
1852
  }
1853

1854
  const STraceId *trace = &pRsp->info.traceId;
200✔
1855
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
200✔
1856
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1857

1858
  return 0;
200✔
1859
}
1860

1861
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,393✔
1862
  SMnode *pMnode = pReq->info.node;
1,393✔
1863
  int32_t totalRows = 0;
1,393✔
1864
  int32_t numOfRows = 0;
1,393✔
1865
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,393✔
1866
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,393✔
1867
  char   *pWrite = NULL;
1,393✔
1868
  int32_t cols = 0;
1,393✔
1869
  int32_t code = 0;
1,393✔
1870
  int32_t lino = 0;
1,393✔
1871

1872
  cfgOpts[totalRows] = "statusIntervalMs";
1,393✔
1873
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
1,393✔
1874
  totalRows++;
1,393✔
1875

1876
  cfgOpts[totalRows] = "timezone";
1,393✔
1877
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,393✔
1878
  totalRows++;
1,393✔
1879

1880
  cfgOpts[totalRows] = "locale";
1,393✔
1881
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,393✔
1882
  totalRows++;
1,393✔
1883

1884
  cfgOpts[totalRows] = "charset";
1,393✔
1885
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,393✔
1886
  totalRows++;
1,393✔
1887

1888
  cfgOpts[totalRows] = "monitor";
1,393✔
1889
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,393✔
1890
  totalRows++;
1,393✔
1891

1892
  cfgOpts[totalRows] = "monitorInterval";
1,393✔
1893
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,393✔
1894
  totalRows++;
1,393✔
1895

1896
  cfgOpts[totalRows] = "slowLogThreshold";
1,393✔
1897
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,393✔
1898
  totalRows++;
1,393✔
1899

1900
  cfgOpts[totalRows] = "slowLogMaxLen";
1,393✔
1901
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,393✔
1902
  totalRows++;
1,393✔
1903

1904
  char scopeStr[64] = {0};
1,393✔
1905
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,393✔
1906
  cfgOpts[totalRows] = "slowLogScope";
1,393✔
1907
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,393✔
1908
  totalRows++;
1,393✔
1909

1910
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,393✔
1911
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,393✔
1912

1913
  for (int32_t i = 0; i < totalRows; i++) {
13,930✔
1914
    cols = 0;
12,537✔
1915

1916
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
12,537✔
1917
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,537✔
1918
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
12,537✔
1919

1920
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
12,537✔
1921
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,537✔
1922
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
12,537✔
1923

1924
    numOfRows++;
12,537✔
1925
  }
1926

1927
_OVER:
1,393✔
1928
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,393✔
1929
  pShow->numOfRows += numOfRows;
1,393✔
1930
  return numOfRows;
1,393✔
1931
}
1932

1933
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1934

1935
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
873,062✔
1936
  SMnode    *pMnode = pReq->info.node;
873,062✔
1937
  SSdb      *pSdb = pMnode->pSdb;
873,062✔
1938
  int32_t    numOfRows = 0;
873,062✔
1939
  int32_t    cols = 0;
873,062✔
1940
  ESdbStatus objStatus = 0;
873,062✔
1941
  SDnodeObj *pDnode = NULL;
873,062✔
1942
  int64_t    curMs = taosGetTimestampMs();
873,062✔
1943
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
872,869✔
1944
  int32_t    code = 0;
873,062✔
1945
  int32_t    lino = 0;
873,062✔
1946

1947
  while (numOfRows < rows) {
3,056,116✔
1948
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
3,056,116✔
1949
    if (pShow->pIter == NULL) break;
3,056,116✔
1950
    bool online = mndIsDnodeOnline(pDnode, curMs);
2,183,054✔
1951

1952
    cols = 0;
2,183,054✔
1953

1954
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,183,054✔
1955
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
2,183,054✔
1956

1957
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
2,183,054✔
1958

1959
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,183,054✔
1960
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,183,054✔
1961

1962
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,183,054✔
1963
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
2,183,054✔
1964
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
2,183,054✔
1965

1966
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,183,054✔
1967
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
2,183,054✔
1968
                        &lino, _OVER);
1969

1970
    const char *status = "ready";
2,183,054✔
1971
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
2,183,054✔
1972
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
2,183,054✔
1973
    if (!online) {
2,183,054✔
1974
      if (objStatus == SDB_STATUS_CREATING)
237,236✔
1975
        status = "creating*";
×
1976
      else if (objStatus == SDB_STATUS_DROPPING)
237,236✔
1977
        status = "dropping*";
×
1978
      else
1979
        status = "offline";
237,236✔
1980
    }
1981

1982
    STR_TO_VARSTR(buf, status);
2,183,054✔
1983
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,183,054✔
1984
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,183,054✔
1985

1986
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,183,054✔
1987
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
2,183,054✔
1988
                        _OVER);
1989

1990
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,183,054✔
1991
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
2,183,054✔
1992
                        _OVER);
1993

1994
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
2,183,054✔
1995
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
2,183,054✔
1996

1997
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,183,054✔
1998
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
2,183,054✔
1999
    taosMemoryFreeClear(b);
2,183,054✔
2000

2001
#ifdef TD_ENTERPRISE
2002
    STR_TO_VARSTR(buf, pDnode->machineId);
2,183,054✔
2003
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,183,054✔
2004
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
2,183,054✔
2005
#endif
2006

2007
    numOfRows++;
2,183,054✔
2008
    sdbRelease(pSdb, pDnode);
2,183,054✔
2009
  }
2010

2011
_OVER:
872,869✔
2012
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
873,062✔
2013

2014
  pShow->numOfRows += numOfRows;
873,062✔
2015
  return numOfRows;
873,062✔
2016
}
2017

2018
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
2019
  SSdb *pSdb = pMnode->pSdb;
×
2020
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
2021
}
×
2022

2023
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
2024
  int32_t    code = 0;
×
2025
  SDnodeObj *pObj = NULL;
×
2026
  void      *pIter = NULL;
×
2027
  SSdb      *pSdb = pMnode->pSdb;
×
2028
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
2029
  if (fqdns == NULL) {
×
2030
    mError("failed to init fqdns array");
×
2031
    return NULL;
×
2032
  }
2033

2034
  while (1) {
×
2035
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
2036
    if (pIter == NULL) break;
×
2037

2038
    char *fqdn = taosStrdup(pObj->fqdn);
×
2039
    if (fqdn == NULL) {
×
2040
      sdbRelease(pSdb, pObj);
×
2041
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
2042

2043
      code = terrno;
×
2044
      break;
×
2045
    }
2046

2047
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
2048
      mError("failed to fqdn into array, but continue at this time");
×
2049
    }
2050
    sdbRelease(pSdb, pObj);
×
2051
  }
2052

2053
_error:
×
2054
  if (code != 0) {
×
2055
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
2056
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
2057
      taosMemoryFreeClear(pFqdn);
×
2058
    }
2059
    taosArrayDestroy(fqdns);
×
2060
    fqdns = NULL;
×
2061
  }
2062

2063
  return fqdns;
×
2064
}
2065

2066
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq) {
682,755✔
2067
  SMnode     *pMnode = pReq->info.node;
682,755✔
2068
  SKeySyncReq req = {0};
682,755✔
2069
  SKeySyncRsp rsp = {0};
682,755✔
2070
  int32_t     code = TSDB_CODE_SUCCESS;
682,755✔
2071

2072
  code = tDeserializeSKeySyncReq(pReq->pCont, pReq->contLen, &req);
682,755✔
2073
  if (code != 0) {
682,755✔
2074
    mError("failed to deserialize key sync req, since %s", tstrerror(code));
×
2075
    goto _OVER;
×
2076
  }
2077

2078
  mInfo("received key sync req from dnode:%d, keyVersion:%d", req.dnodeId, req.keyVersion);
682,755✔
2079

2080
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2081
  // Load mnode's encryption keys
2082
  char masterKeyFile[PATH_MAX] = {0};
682,755✔
2083
  snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
682,755✔
2084
           TD_DIRSEP);
2085
  char derivedKeyFile[PATH_MAX] = {0};
682,755✔
2086
  snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
682,755✔
2087
           TD_DIRSEP);
2088
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
682,755✔
2089
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
682,755✔
2090
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
682,755✔
2091
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
682,755✔
2092
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
682,755✔
2093
  int32_t algorithm = 0;
682,755✔
2094
  int32_t cfgAlgorithm = 0;
682,755✔
2095
  int32_t metaAlgorithm = 0;
682,755✔
2096
  int32_t fileVersion = 0;
682,755✔
2097
  int32_t keyVersion = 0;
682,755✔
2098
  int64_t createTime = 0;
682,755✔
2099
  int64_t svrKeyUpdateTime = 0;
682,755✔
2100
  int64_t dbKeyUpdateTime = 0;
682,755✔
2101

2102
  if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
682,755✔
2103
    keyVersion = tsEncryptKeyVersion;
1,664✔
2104
    tstrncpy(svrKey, tsSvrKey, ENCRYPT_KEY_LEN + 1);
1,664✔
2105
    tstrncpy(dbKey, tsDbKey, ENCRYPT_KEY_LEN + 1);
1,664✔
2106
    tstrncpy(cfgKey, tsCfgKey, ENCRYPT_KEY_LEN + 1);
1,664✔
2107
    tstrncpy(metaKey, tsMetaKey, ENCRYPT_KEY_LEN + 1);
1,664✔
2108
    tstrncpy(dataKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
1,664✔
2109
    algorithm = tsEncryptAlgorithmType;
1,664✔
2110
    cfgAlgorithm = tsCfgAlgorithm;
1,664✔
2111
    metaAlgorithm = tsMetaAlgorithm;
1,664✔
2112
    fileVersion = tsEncryptFileVersion;
1,664✔
2113
    createTime = tsEncryptKeyCreateTime;
1,664✔
2114
    svrKeyUpdateTime = tsSvrKeyUpdateTime;
1,664✔
2115
    dbKeyUpdateTime = tsDbKeyUpdateTime;
1,664✔
2116
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_LOADED;
1,664✔
2117
  } else {
2118
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_DISABLED;
681,091✔
2119
  }
2120

2121
  // Check if dnode needs update
2122
  if (req.keyVersion != keyVersion) {
682,755✔
2123
    mInfo("dnode:%d key version mismatch, mnode:%d, dnode:%d, will send keys", req.dnodeId, keyVersion, req.keyVersion);
1,664✔
2124

2125
    rsp.keyVersion = keyVersion;
1,664✔
2126
    rsp.needUpdate = 1;
1,664✔
2127
    tstrncpy(rsp.svrKey, svrKey, sizeof(rsp.svrKey));
1,664✔
2128
    tstrncpy(rsp.dbKey, dbKey, sizeof(rsp.dbKey));
1,664✔
2129
    tstrncpy(rsp.cfgKey, cfgKey, sizeof(rsp.cfgKey));
1,664✔
2130
    tstrncpy(rsp.metaKey, metaKey, sizeof(rsp.metaKey));
1,664✔
2131
    tstrncpy(rsp.dataKey, dataKey, sizeof(rsp.dataKey));
1,664✔
2132
    rsp.algorithm = algorithm;
1,664✔
2133
    rsp.createTime = createTime;
1,664✔
2134
    rsp.svrKeyUpdateTime = svrKeyUpdateTime;
1,664✔
2135
    rsp.dbKeyUpdateTime = dbKeyUpdateTime;
1,664✔
2136
  } else {
2137
    mInfo("dnode:%d key version matches, version:%d", req.dnodeId, keyVersion);
681,091✔
2138
    rsp.keyVersion = keyVersion;
681,091✔
2139
    rsp.needUpdate = 0;
681,091✔
2140
  }
2141
#else
2142
  // Community edition - no encryption support
2143
  mWarn("enterprise features not enabled, key sync not supported");
2144
  rsp.keyVersion = 0;
2145
  rsp.needUpdate = 0;
2146
#endif
2147

2148
  int32_t contLen = tSerializeSKeySyncRsp(NULL, 0, &rsp);
682,755✔
2149
  if (contLen < 0) {
682,755✔
2150
    code = contLen;
×
2151
    goto _OVER;
×
2152
  }
2153

2154
  void *pHead = rpcMallocCont(contLen);
682,755✔
2155
  if (pHead == NULL) {
682,755✔
2156
    code = TSDB_CODE_OUT_OF_MEMORY;
×
2157
    goto _OVER;
×
2158
  }
2159

2160
  contLen = tSerializeSKeySyncRsp(pHead, contLen, &rsp);
682,755✔
2161
  if (contLen < 0) {
682,755✔
2162
    rpcFreeCont(pHead);
×
2163
    code = contLen;
×
2164
    goto _OVER;
×
2165
  }
2166

2167
  pReq->info.rspLen = contLen;
682,755✔
2168
  pReq->info.rsp = pHead;
682,755✔
2169

2170
_OVER:
682,755✔
2171
  if (code != 0) {
682,755✔
2172
    mError("failed to process key sync req, since %s", tstrerror(code));
×
2173
  }
2174
  return code;
682,755✔
2175
}
2176

2177
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq) { return 0; }
×
2178

2179
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
2180
  if (p == NULL) return NULL;
×
2181

2182
  switch (type) {
×
2183
    case SDB_DNODE:
×
2184
      return (SDnodeObj *)p;
×
2185
    case SDB_QNODE:
×
2186
      return ((SQnodeObj *)p)->pDnode;
×
2187
    case SDB_SNODE:
×
2188
      return ((SSnodeObj *)p)->pDnode;
×
2189
    case SDB_BNODE:
×
2190
      return ((SBnodeObj *)p)->pDnode;
×
2191
    default:
×
2192
      break;
×
2193
  }
2194
  return NULL;
×
2195
}
2196
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
2197
  int32_t lino = 0;
×
2198
  SSdb   *pSdb = pMnode->pSdb;
×
2199
  void   *pIter = NULL;
×
2200
  int32_t code = 0;
×
2201

2202
  while (1) {
×
2203
    void *pObj = NULL;
×
2204
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
2205
    if (pIter == NULL) break;
×
2206

2207
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
2208
    if (pDnodeObj == NULL) {
×
2209
      mError("null dnode object for type:%d", type);
×
2210
      sdbRelease(pSdb, pObj);
×
2211
      continue;
×
2212
    }
2213

2214
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
2215
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
2216
      mError("failed to push addr into array");
×
2217
      sdbRelease(pSdb, pObj);
×
2218
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2219
    }
2220
    sdbRelease(pSdb, pObj);
×
2221
  }
2222

2223
_exit:
×
2224
  return code;
×
2225
}
2226

2227
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
2228
  int32_t lino = 0;
×
2229
  int32_t code = 0;
×
2230
  if (pMnode == NULL || pAddr == NULL) {
×
2231
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
2232
  }
2233

2234
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
2235
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2236

2237
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
2238
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2239

2240
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
2241
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2242

2243
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
2244
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2245

2246
_error:
×
2247
  return code;
×
2248
}
2249

2250
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
2251
  int32_t code = 0;
×
2252

2253
  SMnode *pMnode = pReq->info.node;
×
2254
  void   *pIter = NULL;
×
2255
  SSdb   *pSdb = pMnode->pSdb;
×
2256
  mInfo("start to reload dnode tls config");
×
2257

2258
  SMCfgDnodeReq req = {0};
×
2259
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
2260
    goto _OVER;
×
2261
  }
2262

2263
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
2264
    goto _OVER;
×
2265
  }
2266

2267
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
2268
  if (pAddr == NULL) {
×
2269
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
2270
  }
2271

2272
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
2273

2274
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
2275
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
2276
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
2277
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
2278
    if (code != 0) {
×
2279
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
2280
    }
2281
  }
2282

2283
_OVER:
×
2284
  tFreeSMCfgDnodeReq(&req);
×
2285
  taosArrayDestroy(pAddr);
×
2286
  return code;
×
2287
}
2288

2289
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
2290
  int32_t code = 0;
×
2291
  if (pRsp->code != 0) {
×
2292
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
2293
  } else {
2294
    mInfo("succeed to reload dnode tls config");
×
2295
  }
2296
  return code;
×
2297
}
2298

2299
static int32_t mndProcessAlterEncryptKeyReqImpl(SRpcMsg *pReq, SMAlterEncryptKeyReq *pAlterReq) {
×
2300
  int32_t code = 0;
×
2301
  SMnode *pMnode = pReq->info.node;
×
2302
  SSdb   *pSdb = pMnode->pSdb;
×
2303
  void   *pIter = NULL;
×
2304

2305
  const STraceId *trace = &pReq->info.traceId;
×
2306

2307
  // Validate key type
2308
  if (pAlterReq->keyType != 0 && pAlterReq->keyType != 1) {
×
2309
    mGError("msg:%p, failed to alter encrypt key since invalid key type:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", pReq,
×
2310
            pAlterReq->keyType);
2311
    return TSDB_CODE_INVALID_PARA;
×
2312
  }
2313

2314
  // Validate new key length
2315
  int32_t klen = strlen(pAlterReq->newKey);
×
2316
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
2317
    mGError("msg:%p, failed to alter encrypt key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
2318
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
2319
    return TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
2320
  }
2321

2322
  // Prepare SMAlterEncryptKeyReq for distribution to dnodes
2323
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
2324
  alterKeyReq.keyType = pAlterReq->keyType;
×
2325
  tstrncpy(alterKeyReq.newKey, pAlterReq->newKey, sizeof(alterKeyReq.newKey));
×
2326
  alterKeyReq.sqlLen = 0;
×
2327
  alterKeyReq.sql = NULL;
×
2328

2329
  // Send request to all online dnodes
2330
  while (1) {
×
2331
    SDnodeObj *pDnode = NULL;
×
2332
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2333
    if (pIter == NULL) break;
×
2334

2335
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
2336
      mGWarn("msg:%p, don't send alter encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2337
             offlineReason[pDnode->offlineReason]);
2338
      sdbRelease(pSdb, pDnode);
×
2339
      continue;
×
2340
    }
2341

2342
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2343
    int32_t bufLen = tSerializeSMAlterEncryptKeyReq(NULL, 0, &alterKeyReq);
×
2344
    void   *pBuf = rpcMallocCont(bufLen);
×
2345

2346
    if (pBuf != NULL) {
×
2347
      if ((bufLen = tSerializeSMAlterEncryptKeyReq(pBuf, bufLen, &alterKeyReq)) <= 0) {
×
2348
        code = bufLen;
×
2349
        sdbRelease(pSdb, pDnode);
×
2350
        goto _exit;
×
2351
      }
2352
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
2353
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2354
      if (ret != 0) {
×
2355
        mGError("msg:%p, failed to send alter encrypt_key req to dnode:%d, error:%s", pReq, pDnode->id, tstrerror(ret));
×
2356
      } else {
2357
        mGInfo("msg:%p, send alter encrypt_key req to dnode:%d, keyType:%d", pReq, pDnode->id, pAlterReq->keyType);
×
2358
      }
2359
    }
2360

2361
    sdbRelease(pSdb, pDnode);
×
2362
  }
2363

2364
  // Note: mnode runs on dnode, so the local keys will be updated by dnode itself
2365
  // when it receives the alter encrypt key request from mnode
2366
  mGInfo("msg:%p, successfully sent alter encrypt key request to all dnodes, keyType:%d", pReq, pAlterReq->keyType);
×
2367

2368
_exit:
×
2369
  if (code != 0) {
×
2370
    if (terrno == 0) terrno = code;
×
2371
  }
2372
  return code;
×
2373
}
2374

2375
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq) {
×
2376
  SMnode              *pMnode = pReq->info.node;
×
2377
  SMAlterEncryptKeyReq alterReq = {0};
×
2378
  int32_t              code = TSDB_CODE_SUCCESS;
×
2379
  int32_t              lino = 0;
×
2380

2381
  // Check privilege - only admin can alter encryption keys
2382
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2383
                  &lino, _OVER);
2384

2385
  // Deserialize request
2386
  code = tDeserializeSMAlterEncryptKeyReq(pReq->pCont, pReq->contLen, &alterReq);
×
2387
  if (code != 0) {
×
2388
    mError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
2389
    goto _OVER;
×
2390
  }
2391

2392
  mInfo("received alter encrypt key req, keyType:%d", alterReq.keyType);
×
2393

2394
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2395
  // Process and distribute to all dnodes
2396
  code = mndProcessAlterEncryptKeyReqImpl(pReq, &alterReq);
×
2397
  if (code == 0) {
×
2398
    // Audit log
2399
    auditRecord(pReq, pMnode->clusterId, "alterEncryptKey", "", alterReq.keyType == 0 ? "SVR_KEY" : "DB_KEY",
×
2400
                alterReq.sql, alterReq.sqlLen, 0, 0);
2401
  }
2402
#else
2403
  // Community edition - no encryption support
2404
  mError("encryption key management is only available in enterprise edition");
2405
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2406
#endif
2407

2408
_OVER:
×
2409
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2410
    mError("failed to alter encrypt key, keyType:%d, since %s", alterReq.keyType, tstrerror(code));
×
2411
  }
2412

2413
  tFreeSMAlterEncryptKeyReq(&alterReq);
×
2414
  TAOS_RETURN(code);
×
2415
}
2416

2417
static int32_t mndProcessAlterKeyExpirationReqImpl(SRpcMsg *pReq, SMAlterKeyExpirationReq *pAlterReq) {
×
2418
  int32_t code = 0;
×
2419
  SMnode *pMnode = pReq->info.node;
×
2420
  SSdb   *pSdb = pMnode->pSdb;
×
2421
  void   *pIter = NULL;
×
2422

2423
  const STraceId *trace = &pReq->info.traceId;
×
2424

2425
  // Validate days value
2426
  if (pAlterReq->days < 0) {
×
2427
    mGError("msg:%p, failed to alter key expiration since invalid days:%d, must be >= 0", pReq, pAlterReq->days);
×
2428
    return TSDB_CODE_INVALID_PARA;
×
2429
  }
2430

2431
  // Validate strategy
2432
  if (strlen(pAlterReq->strategy) == 0) {
×
2433
    mGError("msg:%p, failed to alter key expiration since empty strategy", pReq);
×
2434
    return TSDB_CODE_INVALID_PARA;
×
2435
  }
2436

2437
  // Prepare SMAlterKeyExpirationReq for distribution to dnodes
2438
  SMAlterKeyExpirationReq alterReq = {0};
×
2439
  alterReq.days = pAlterReq->days;
×
2440
  tstrncpy(alterReq.strategy, pAlterReq->strategy, sizeof(alterReq.strategy));
×
2441
  alterReq.sqlLen = 0;
×
2442
  alterReq.sql = NULL;
×
2443

2444
  // Send request to all online dnodes
2445
  while (1) {
×
2446
    SDnodeObj *pDnode = NULL;
×
2447
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2448
    if (pIter == NULL) break;
×
2449

2450
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
2451
      mGWarn("msg:%p, don't send alter key_expiration req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2452
             offlineReason[pDnode->offlineReason]);
2453
      sdbRelease(pSdb, pDnode);
×
2454
      continue;
×
2455
    }
2456

2457
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2458
    int32_t bufLen = tSerializeSMAlterKeyExpirationReq(NULL, 0, &alterReq);
×
2459
    void   *pBuf = rpcMallocCont(bufLen);
×
2460

2461
    if (pBuf != NULL) {
×
2462
      if ((bufLen = tSerializeSMAlterKeyExpirationReq(pBuf, bufLen, &alterReq)) <= 0) {
×
2463
        code = bufLen;
×
2464
        sdbRelease(pSdb, pDnode);
×
2465
        goto _exit;
×
2466
      }
2467
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_KEY_EXPIRATION, .pCont = pBuf, .contLen = bufLen};
×
2468
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2469
      if (ret != 0) {
×
2470
        mGError("msg:%p, failed to send alter key_expiration req to dnode:%d, error:%s", pReq, pDnode->id,
×
2471
                tstrerror(ret));
2472
      } else {
2473
        mGInfo("msg:%p, send alter key_expiration req to dnode:%d, days:%d, strategy:%s", pReq, pDnode->id,
×
2474
               pAlterReq->days, pAlterReq->strategy);
2475
      }
2476
    }
2477

2478
    sdbRelease(pSdb, pDnode);
×
2479
  }
2480

2481
  mGInfo("msg:%p, successfully sent alter key expiration request to all dnodes, days:%d, strategy:%s", pReq,
×
2482
         pAlterReq->days, pAlterReq->strategy);
2483

2484
_exit:
×
2485
  if (code != 0) {
×
2486
    if (terrno == 0) terrno = code;
×
2487
  }
2488
  return code;
×
2489
}
2490

2491
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq) {
×
2492
  SMnode                 *pMnode = pReq->info.node;
×
2493
  SMAlterKeyExpirationReq alterReq = {0};
×
2494
  int32_t                 code = TSDB_CODE_SUCCESS;
×
2495
  int32_t                 lino = 0;
×
2496

2497
  // Check privilege - only admin can alter key expiration
2498
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2499
                  &lino, _OVER);
2500

2501
  // Deserialize request
2502
  code = tDeserializeSMAlterKeyExpirationReq(pReq->pCont, pReq->contLen, &alterReq);
×
2503
  if (code != 0) {
×
2504
    mError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
2505
    goto _OVER;
×
2506
  }
2507

2508
  mInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
2509

2510
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2511
  // Process and distribute to all dnodes
2512
  code = mndProcessAlterKeyExpirationReqImpl(pReq, &alterReq);
×
2513
  if (code == 0) {
×
2514
    // Audit log
2515
    char detail[128];
×
2516
    snprintf(detail, sizeof(detail), "%d DAYS %s", alterReq.days, alterReq.strategy);
×
2517
    auditRecord(pReq, pMnode->clusterId, "alterKeyExpiration", "", detail, alterReq.sql, alterReq.sqlLen, 0, 0);
×
2518
  }
2519
#else
2520
  // Community edition - no encryption support
2521
  mError("key expiration management is only available in enterprise edition");
2522
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2523
#endif
2524

2525
_OVER:
×
2526
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2527
    mError("failed to alter key expiration, days:%d, strategy:%s, since %s", alterReq.days, alterReq.strategy,
×
2528
           tstrerror(code));
2529
  }
2530

2531
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
2532
  TAOS_RETURN(code);
×
2533
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc