• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4984

13 Mar 2026 03:38AM UTC coverage: 68.643% (-0.01%) from 68.653%
#4984

push

travis-ci

web-flow
feat/6641435300-save-audit-in-self (#34738)

434 of 584 new or added lines in 10 files covered. (74.32%)

3048 existing lines in 150 files now uncovered.

212713 of 309883 relevant lines covered (68.64%)

135561814.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.93
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndToken.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "taos_monitor.h"
34
#include "tconfig.h"
35
#include "tjson.h"
36
#include "tmisce.h"
37
#include "tunit.h"
38
#if defined(TD_ENTERPRISE)
39
#include "taoskInt.h"
40
#endif
41

42
#define TSDB_DNODE_VER_NUMBER   2
43
#define TSDB_DNODE_RESERVE_SIZE 40
44

45
static const char *offlineReason[] = {
46
    "",
47
    "status msg timeout",
48
    "status not received",
49
    "version not match",
50
    "dnodeId not match",
51
    "clusterId not match",
52
    "statusInterval not match",
53
    "timezone not match",
54
    "locale not match",
55
    "charset not match",
56
    "ttlChangeOnWrite not match",
57
    "enableWhiteList not match",
58
    "encryptionKey not match",
59
    "monitor not match",
60
    "monitor switch not match",
61
    "monitor interval not match",
62
    "monitor slow log threshold not match",
63
    "monitor slow log sql max len not match",
64
    "monitor slow log scope not match",
65
    "unknown",
66
};
67

68
enum {
69
  DND_ACTIVE_CODE,
70
  DND_CONN_ACTIVE_CODE,
71
};
72

73
enum {
74
  DND_CREATE,
75
  DND_ADD,
76
  DND_DROP,
77
};
78

79
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
80
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
81
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
82
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
83
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
84
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
85
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
86

87
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
89
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
90
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
91
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
92
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
93
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
94
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
95
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
96
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
97
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
98

99
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
100
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
101
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
102
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
103

104
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq);
105
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq);
106
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
107
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
108
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq);
109
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq);
110

111
#ifdef _GRANT
112
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
113
#else
114
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
115
#endif
116

117
int32_t mndInitDnode(SMnode *pMnode) {
429,135✔
118
  SSdbTable table = {
429,135✔
119
      .sdbType = SDB_DNODE,
120
      .keyType = SDB_KEY_INT32,
121
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
122
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
123
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
124
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
125
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
126
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
127
  };
128

129
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
429,135✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
429,135✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
429,135✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
429,135✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
429,135✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
429,135✔
135
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
429,135✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
429,135✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
429,135✔
138
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
429,135✔
139
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
429,135✔
140
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
429,135✔
141
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
429,135✔
142
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
429,135✔
143

144
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
429,135✔
145
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
429,135✔
146
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
429,135✔
147
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
429,135✔
148

149
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC, mndProcessKeySyncReq);
429,135✔
150
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC_RSP, mndProcessKeySyncRsp);
429,135✔
151
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_ENCRYPT_KEY, mndProcessAlterEncryptKeyReq);
429,135✔
152
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_KEY_EXPIRATION, mndProcessAlterKeyExpirationReq);
429,135✔
153

154
  return sdbSetTable(pMnode->pSdb, table);
429,135✔
155
}
156

157
void mndCleanupDnode(SMnode *pMnode) {}
429,074✔
158

159
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
308,870✔
160
  int32_t  code = -1;
308,870✔
161
  SSdbRaw *pRaw = NULL;
308,870✔
162
  STrans  *pTrans = NULL;
308,870✔
163

164
  SDnodeObj dnodeObj = {0};
308,870✔
165
  dnodeObj.id = 1;
308,870✔
166
  dnodeObj.createdTime = taosGetTimestampMs();
308,870✔
167
  dnodeObj.updateTime = dnodeObj.createdTime;
308,870✔
168
  dnodeObj.port = tsServerPort;
308,870✔
169
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
308,870✔
170
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
308,870✔
171
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
308,870✔
172
  char *machineId = NULL;
308,870✔
173
  code = tGetMachineId(&machineId);
308,870✔
174
  if (machineId) {
308,870✔
175
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
308,870✔
176
    taosMemoryFreeClear(machineId);
308,870✔
177
  } else {
178
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
179
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
180
    goto _OVER;
×
181
#endif
182
  }
183

184
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
308,870✔
185
  if (pTrans == NULL) {
308,870✔
186
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
187
    if (terrno != 0) code = terrno;
×
188
    goto _OVER;
×
189
  }
190
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
308,870✔
191

192
  pRaw = mndDnodeActionEncode(&dnodeObj);
308,870✔
193
  if (pRaw == NULL) {
308,870✔
194
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
195
    if (terrno != 0) code = terrno;
×
196
    goto _OVER;
×
197
  }
198
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
308,870✔
199
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
308,870✔
200
  pRaw = NULL;
308,870✔
201

202
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
308,870✔
203
  code = 0;
308,870✔
204

205
_OVER:
308,870✔
206
  mndTransDrop(pTrans);
308,870✔
207
  sdbFreeRaw(pRaw);
308,870✔
208
  return code;
308,870✔
209
}
210

211
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,399,995✔
212
  int32_t code = 0;
2,399,995✔
213
  int32_t lino = 0;
2,399,995✔
214
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,399,995✔
215

216
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,399,995✔
217
  if (pRaw == NULL) goto _OVER;
2,399,995✔
218

219
  int32_t dataPos = 0;
2,399,995✔
220
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,399,995✔
221
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,399,995✔
222
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,399,995✔
223
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,399,995✔
224
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,399,995✔
225
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,399,995✔
226
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,399,995✔
227
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,399,995✔
228
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,399,995✔
229
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,399,995✔
230

231
  terrno = 0;
2,399,995✔
232

233
_OVER:
2,399,995✔
234
  if (terrno != 0) {
2,399,995✔
235
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
236
    sdbFreeRaw(pRaw);
×
237
    return NULL;
×
238
  }
239

240
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,399,995✔
241
  return pRaw;
2,399,995✔
242
}
243

244
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
1,602,022✔
245
  int32_t code = 0;
1,602,022✔
246
  int32_t lino = 0;
1,602,022✔
247
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,602,022✔
248
  SSdbRow   *pRow = NULL;
1,602,022✔
249
  SDnodeObj *pDnode = NULL;
1,602,022✔
250

251
  int8_t sver = 0;
1,602,022✔
252
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,602,022✔
253
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
1,602,022✔
254
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
255
    goto _OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SDnodeObj));
1,602,022✔
259
  if (pRow == NULL) goto _OVER;
1,602,022✔
260

261
  pDnode = sdbGetRowObj(pRow);
1,602,022✔
262
  if (pDnode == NULL) goto _OVER;
1,602,022✔
263

264
  int32_t dataPos = 0;
1,602,022✔
265
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
1,602,022✔
266
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
1,602,022✔
267
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
1,602,022✔
268
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
1,602,022✔
269
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
1,602,022✔
270
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
1,602,022✔
271
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
1,602,022✔
272
  if (sver > 1) {
1,602,022✔
273
    int16_t keyLen = 0;
1,602,022✔
274
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,602,022✔
275
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,602,022✔
276
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,602,022✔
277
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,602,022✔
278
  }
279

280
  terrno = 0;
1,602,022✔
281
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
1,602,022✔
282
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
283
  }
284

285
_OVER:
1,602,022✔
286
  if (terrno != 0) {
1,602,022✔
287
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
1,602,022✔
293
  return pRow;
1,602,022✔
294
}
295

296
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
749,873✔
297
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
749,873✔
298
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
749,873✔
299

300
  char ep[TSDB_EP_LEN] = {0};
749,873✔
301
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
749,873✔
302
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
749,873✔
303
  return 0;
749,873✔
304
}
305

306
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
1,601,977✔
307
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
1,601,977✔
308
  return 0;
1,601,977✔
309
}
310

311
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
843,144✔
312
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
843,144✔
313
  pOld->updateTime = pNew->updateTime;
843,144✔
314
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
315
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
843,144✔
316
#endif
317
  return 0;
843,144✔
318
}
319

320
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
169,244,190✔
321
  SSdb      *pSdb = pMnode->pSdb;
169,244,190✔
322
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
169,244,190✔
323
  if (pDnode == NULL) {
169,244,706✔
324
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
212,546✔
325
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
16,286✔
326
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
196,260✔
327
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
328
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
196,260✔
329
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
196,260✔
330
    } else {
331
      terrno = TSDB_CODE_APP_ERROR;
×
332
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
333
    }
334
  }
335

336
  return pDnode;
169,244,706✔
337
}
338

339
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
170,300,682✔
340
  SSdb *pSdb = pMnode->pSdb;
170,300,682✔
341
  sdbRelease(pSdb, pDnode);
170,300,682✔
342
}
170,300,682✔
343

344
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
9,585,191✔
345
  SEpSet epSet = {0};
9,585,191✔
346
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
9,585,191✔
347
  return epSet;
9,585,191✔
348
}
349

350
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
909,834✔
351
  SEpSet     epSet = {0};
909,834✔
352
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
909,834✔
353
  if (!pDnode) return epSet;
909,834✔
354

355
  epSet = mndGetDnodeEpset(pDnode);
909,834✔
356

357
  mndReleaseDnode(pMnode, pDnode);
909,834✔
358
  return epSet;
909,834✔
359
}
360

361
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,323,877✔
362
  SSdb *pSdb = pMnode->pSdb;
1,323,877✔
363

364
  void *pIter = NULL;
1,323,877✔
365
  while (1) {
2,121,756✔
366
    SDnodeObj *pDnode = NULL;
3,445,633✔
367
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
3,445,633✔
368
    if (pIter == NULL) break;
3,445,633✔
369

370
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
2,743,577✔
371
      sdbCancelFetch(pSdb, pIter);
621,821✔
372
      return pDnode;
621,821✔
373
    }
374

375
    sdbRelease(pSdb, pDnode);
2,121,756✔
376
  }
377

378
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
702,056✔
379
  return NULL;
702,056✔
380
}
381

382
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
153,230✔
383
  SSdb *pSdb = pMnode->pSdb;
153,230✔
384

385
  void *pIter = NULL;
153,230✔
386
  while (1) {
162,673✔
387
    SDnodeObj *pDnode = NULL;
315,903✔
388
    ESdbStatus objStatus = 0;
315,903✔
389
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
315,903✔
390
    if (pIter == NULL) break;
315,903✔
391

392
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
315,903✔
393
      sdbCancelFetch(pSdb, pIter);
153,230✔
394
      return pDnode;
153,230✔
395
    }
396

397
    sdbRelease(pSdb, pDnode);
162,673✔
398
  }
399

400
  return NULL;
×
401
}
402

403
int32_t mndGetDnodeSize(SMnode *pMnode) {
83,069,511✔
404
  SSdb *pSdb = pMnode->pSdb;
83,069,511✔
405
  return sdbGetSize(pSdb, SDB_DNODE);
83,070,950✔
406
}
407

408
int32_t mndGetDbSize(SMnode *pMnode) {
×
409
  SSdb *pSdb = pMnode->pSdb;
×
410
  return sdbGetSize(pSdb, SDB_DB);
×
411
}
412

413
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
101,758,659✔
414
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
101,758,659✔
415
  if (interval > (int64_t)tsStatusTimeoutMs) {
101,758,532✔
416
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,235,132✔
417
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
44,924✔
418
    }
419
    return false;
2,234,817✔
420
  }
421
  return true;
99,523,400✔
422
}
423

424
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,050,881✔
425
  SSdb *pSdb = pMnode->pSdb;
2,050,881✔
426

427
  int32_t numOfEps = 0;
2,050,881✔
428
  void   *pIter = NULL;
2,050,881✔
429
  while (1) {
6,142,490✔
430
    SDnodeObj *pDnode = NULL;
8,193,371✔
431
    ESdbStatus objStatus = 0;
8,193,371✔
432
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
8,193,371✔
433
    if (pIter == NULL) break;
8,193,371✔
434

435
    SDnodeEp dnodeEp = {0};
6,142,490✔
436
    dnodeEp.id = pDnode->id;
6,142,490✔
437
    dnodeEp.ep.port = pDnode->port;
6,142,490✔
438
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
6,142,490✔
439
    sdbRelease(pSdb, pDnode);
6,142,490✔
440

441
    dnodeEp.isMnode = 0;
6,142,490✔
442
    if (mndIsMnode(pMnode, pDnode->id)) {
6,142,490✔
443
      dnodeEp.isMnode = 1;
2,484,150✔
444
    }
445
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
6,142,490✔
446
      mError("failed to put ep into array, but continue at this call");
×
447
    }
448
  }
449
}
2,050,881✔
450

451
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
24,643,066✔
452
  SSdb   *pSdb = pMnode->pSdb;
24,643,066✔
453
  int32_t code = 0;
24,643,066✔
454

455
  int32_t numOfEps = 0;
24,643,066✔
456
  void   *pIter = NULL;
24,643,066✔
457
  while (1) {
101,387,317✔
458
    SDnodeObj *pDnode = NULL;
126,030,383✔
459
    ESdbStatus objStatus = 0;
126,030,383✔
460
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
126,030,383✔
461
    if (pIter == NULL) break;
126,030,383✔
462

463
    SDnodeInfo dInfo;
101,386,918✔
464
    dInfo.id = pDnode->id;
101,387,317✔
465
    dInfo.ep.port = pDnode->port;
101,387,317✔
466
    dInfo.offlineReason = pDnode->offlineReason;
101,387,317✔
467
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
101,387,317✔
468
    sdbRelease(pSdb, pDnode);
101,387,317✔
469
    if (mndIsMnode(pMnode, pDnode->id)) {
101,387,317✔
470
      dInfo.isMnode = 1;
28,286,884✔
471
    } else {
472
      dInfo.isMnode = 0;
73,100,433✔
473
    }
474

475
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
101,387,317✔
476
      code = terrno;
×
477
      sdbCancelFetch(pSdb, pIter);
×
478
      break;
×
479
    }
480
  }
481
  TAOS_RETURN(code);
24,643,066✔
482
}
483

484
#define CHECK_MONITOR_PARA(para, err)                                                                    \
485
  if (pCfg->monitorParas.para != para) {                                                                 \
486
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
487
    terrno = err;                                                                                        \
488
    return err;                                                                                          \
489
  }
490

491
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
×
492
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
×
493
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
×
494
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
×
495
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
×
496
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
×
497

498
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
×
499
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
500
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
501
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
502
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
503
  }
504

505
  /*
506
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
507
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
508
           tsStatusIntervalMs);
509
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
510
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
511
  }
512
  */
513

514
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
×
515
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
516
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
517
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
518
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
519
  }
520

521
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
×
522
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
523
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
524
    return DND_REASON_LOCALE_NOT_MATCH;
×
525
  }
526

527
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
×
528
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
529
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
530
    return DND_REASON_CHARSET_NOT_MATCH;
×
531
  }
532

533
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
×
534
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
535
           tsTtlChangeOnWrite);
536
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
537
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
538
  }
539
  int8_t enable = tsEnableWhiteList ? 1 : 0;
×
540
  if (pCfg->enableWhiteList != enable) {
×
541
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
542
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
543
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
544
  }
545

546
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
×
547
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
×
548
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
549
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
550
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
551
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
552
  }
553

554
  return DND_REASON_ONLINE;
×
555
}
556

557
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
100,353✔
558
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
100,353✔
559
    return 0.0;
655✔
560
  }
561

562
  int64_t deltaCount = currentCount - lastCount;
99,698✔
563
  int64_t deltaMs = currentTimeMs - lastTimeMs;
99,698✔
564
  double  rate = (double)deltaCount / (double)deltaMs;
99,698✔
565
  return rate;
99,698✔
566
}
567

568
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
126,250,169✔
569
  bool stateChanged = false;
126,250,169✔
570
  bool roleChanged = pGid->syncState != pVload->syncState ||
126,258,605✔
571
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
247,725,940✔
572
                     pGid->roleTimeMs != pVload->roleTimeMs;
121,475,771✔
573

574
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
126,250,169✔
575
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
182,886✔
576
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
74,177✔
577
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
108,709✔
578
      int64_t currentTimeMs = taosGetTimestampMs();
100,353✔
579
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
100,353✔
580
                                          pGid->lastSyncAppliedIndexUpdateTime);
581

582
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
100,353✔
583
    }
584
  }
585

586
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
126,250,169✔
587
  pGid->syncCommitIndex = pVload->syncCommitIndex;
126,250,169✔
588
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
126,250,169✔
589
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
126,250,169✔
590
  pGid->learnerProgress = pVload->learnerProgress;
126,250,169✔
591
  pGid->snapSeq = pVload->snapSeq;
126,250,169✔
592
  pGid->syncTotalIndex = pVload->syncTotalIndex;
126,250,169✔
593
  if (pVload->snapSeq > 0 && pVload->snapSeq < SYNC_SNAPSHOT_SEQ_END || pVload->syncState == TAOS_SYNC_STATE_LEARNER) {
126,250,169✔
594
    mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64
646,483✔
595
          " , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d",
596
          vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex,
597
          pVload->syncTotalIndex, pVload->learnerProgress, pVload->snapSeq);
598
  }
599

600
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
126,250,169✔
601
      pGid->startTimeMs != pVload->startTimeMs) {
120,956,025✔
602
    mInfo(
5,294,144✔
603
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
604
        "canRead:%d, dnode:%d",
605
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
606
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
607
    pGid->syncState = pVload->syncState;
5,294,144✔
608
    pGid->syncTerm = pVload->syncTerm;
5,294,144✔
609
    pGid->syncRestore = pVload->syncRestore;
5,294,144✔
610
    pGid->syncCanRead = pVload->syncCanRead;
5,294,144✔
611
    pGid->startTimeMs = pVload->startTimeMs;
5,294,144✔
612
    pGid->roleTimeMs = pVload->roleTimeMs;
5,294,144✔
613
    stateChanged = true;
5,294,144✔
614
  }
615
  return stateChanged;
126,250,169✔
616
}
617

618
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
29,272,302✔
619
  bool stateChanged = false;
29,272,302✔
620
  bool roleChanged = pObj->syncState != pMload->syncState ||
29,278,173✔
621
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
58,104,690✔
622
                     pObj->roleTimeMs != pMload->roleTimeMs;
28,832,388✔
623
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
29,272,302✔
624
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
446,218✔
625
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
626
          pObj->syncTerm, pMload->syncTerm);
627
    pObj->syncState = pMload->syncState;
446,218✔
628
    pObj->syncTerm = pMload->syncTerm;
446,218✔
629
    pObj->syncRestore = pMload->syncRestore;
446,218✔
630
    pObj->roleTimeMs = pMload->roleTimeMs;
446,218✔
631
    stateChanged = true;
446,218✔
632
  }
633
  return stateChanged;
29,272,302✔
634
}
635

636
extern char   *tsMonFwUri;
637
extern char   *tsMonSlowLogUri;
638
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
448✔
639
  SMnode    *pMnode = pReq->info.node;
448✔
640
  SStatisReq statisReq = {0};
448✔
641
  int32_t    code = -1;
448✔
642

643
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
448✔
644

645
  if (tsMonitorLogProtocol) {
448✔
646
    mInfo("process statis req,\n %s", statisReq.pCont);
67✔
647
  }
648

649
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
448✔
650
    monSendContent(statisReq.pCont, tsMonFwUri);
448✔
651
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
652
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
653
  }
654

655
  tFreeSStatisReq(&statisReq);
448✔
656
  return 0;
448✔
657
}
658

659
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
1,034✔
660
  mTrace("process audit req:%p", pReq);
1,034✔
661
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
1,034✔
662
    SMnode   *pMnode = pReq->info.node;
1,034✔
663
    SAuditReq auditReq = {0};
1,034✔
664

665
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
1,034✔
666

667
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
1,034✔
668

669
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
1,034✔
670
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
671

672
    tFreeSAuditReq(&auditReq);
1,034✔
673
  }
674
  return 0;
1,034✔
675
}
676

677
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
678
  mTrace("process audit req:%p", pReq);
×
679
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
680
    SMnode        *pMnode = pReq->info.node;
×
681
    SBatchAuditReq auditReq = {0};
×
682

683
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
684

685
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
686

687
    for (int32_t i = 0; i < nAudit; ++i) {
×
688
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
689
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
690

691
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
692
                     audit->duration, audit->affectedRows);
693
    }
694

695
    tFreeSBatchAuditReq(&auditReq);
×
696
  }
697
  return 0;
×
698
}
699

700
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
711,838✔
701
  int32_t       code = 0, lino = 0;
711,838✔
702
  SDnodeInfoReq infoReq = {0};
711,838✔
703
  int32_t       contLen = 0;
711,838✔
704
  void         *pReq = NULL;
711,838✔
705

706
  infoReq.dnodeId = pDnode->id;
711,838✔
707
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
711,838✔
708

709
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
711,838✔
710
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
711
  }
712
  pReq = rpcMallocCont(contLen);
711,838✔
713
  if (pReq == NULL) {
711,838✔
714
    TAOS_RETURN(terrno);
×
715
  }
716

717
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
711,838✔
718
    code = contLen;
×
719
    goto _exit;
×
720
  }
721

722
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
711,838✔
723
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
711,838✔
724
_exit:
711,838✔
725
  if (code < 0) {
711,838✔
726
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
727
  }
728
  TAOS_RETURN(code);
711,838✔
729
}
730

731
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
711,838✔
732
  int32_t       code = 0, lino = 0;
711,838✔
733
  SMnode       *pMnode = pReq->info.node;
711,838✔
734
  SDnodeInfoReq infoReq = {0};
711,838✔
735
  SDnodeObj    *pDnode = NULL;
711,838✔
736
  STrans       *pTrans = NULL;
711,838✔
737
  SSdbRaw      *pCommitRaw = NULL;
711,838✔
738

739
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
711,838✔
740

741
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
711,838✔
742
  if (pDnode == NULL) {
711,838✔
UNCOV
743
    TAOS_CHECK_EXIT(terrno);
×
744
  }
745

746
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
711,838✔
747
  if (pTrans == NULL) {
711,838✔
748
    TAOS_CHECK_EXIT(terrno);
×
749
  }
750

751
  pDnode->updateTime = taosGetTimestampMs();
711,838✔
752

753
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
711,838✔
754
    TAOS_CHECK_EXIT(terrno);
×
755
  }
756
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
711,838✔
757
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
758
    TAOS_CHECK_EXIT(code);
×
759
  }
760
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
711,838✔
761
  pCommitRaw = NULL;
711,838✔
762

763
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
711,838✔
764
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
201✔
765
    TAOS_CHECK_EXIT(code);
201✔
766
  }
767

768
_exit:
711,838✔
769
  mndReleaseDnode(pMnode, pDnode);
711,838✔
770
  if (code != 0) {
711,838✔
771
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
201✔
772
  }
773
  mndTransDrop(pTrans);
711,838✔
774
  sdbFreeRaw(pCommitRaw);
711,838✔
775
  TAOS_RETURN(code);
711,838✔
776
}
777

778
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
45,684,056✔
779
  SMnode    *pMnode = pReq->info.node;
45,684,056✔
780
  SStatusReq statusReq = {0};
45,684,056✔
781
  SDnodeObj *pDnode = NULL;
45,684,056✔
782
  int32_t    code = -1;
45,684,056✔
783
  int32_t    lino = 0;
45,684,056✔
784

785
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), &lino, _OVER);
45,684,056✔
786

787
  int64_t clusterid = mndGetClusterId(pMnode);
45,684,056✔
788
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
45,684,056✔
UNCOV
789
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
790
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
791
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
NEW
792
    TAOS_CHECK_GOTO(code, &lino, _OVER);
×
793
  }
794

795
  if (statusReq.dnodeId == 0) {
45,684,056✔
796
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
1,011,722✔
797
    if (pDnode == NULL) {
1,011,722✔
798
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
389,968✔
799
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
389,968✔
800
      if (terrno != 0) code = terrno;
389,968✔
801
      TAOS_CHECK_GOTO(code, &lino, _OVER);
389,968✔
802
    }
803
  } else {
804
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
44,672,334✔
805
    if (pDnode == NULL) {
44,672,334✔
806
      int32_t err = terrno;
160,553✔
807
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
160,553✔
808
      if (pDnode != NULL) {
160,553✔
809
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
67✔
810
        code = err;
67✔
811
        TAOS_CHECK_GOTO(code, &lino, _OVER);
67✔
812
      }
813

814
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
160,486✔
815
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
160,486✔
816
        code = err;
7,256✔
817
        TAOS_CHECK_GOTO(code, &lino, _OVER);
7,256✔
818
      } else {
819
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
153,230✔
820
        if (pDnode == NULL) goto _OVER;
153,230✔
821
      }
822
    }
823
  }
824

825
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
45,286,765✔
826
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
45,286,765✔
827

828
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
45,286,765✔
829
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
45,286,765✔
830
  int64_t curMs = taosGetTimestampMs();
45,286,765✔
831
  bool    online = mndIsDnodeOnline(pDnode, curMs);
45,286,765✔
832
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
45,286,765✔
833
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
45,286,765✔
834
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
45,286,765✔
835
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
45,286,765✔
836
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
45,286,765✔
837
  bool    analVerChanged = (analVer != statusReq.analVer);
45,286,765✔
838
  bool    auditDBChanged = false;
45,286,765✔
839
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
45,286,765✔
840
  bool    auditInfoChanged = false;
45,286,765✔
841
  char    auditToken[TSDB_TOKEN_LEN] = {0};
45,286,765✔
842

843
  SDbObj *pDb = NULL;
45,286,765✔
844
  if (tsAuditUseToken || tsAuditSaveInSelf) {
45,286,765✔
845
    pDb = mndAcquireAuditDb(pMnode);
45,286,765✔
846
  }
847
  if (tsAuditUseToken) {
45,286,765✔
848
    if (pDb != NULL) {
45,285,421✔
849
      SName name = {0};
1,056✔
850
      if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0)
1,056✔
UNCOV
851
        mError("db:%s, failed to parse db name", pDb->name);
×
852
      tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN);
1,056✔
853
    }
854
    if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
45,285,421✔
855

856
    char    auditUser[TSDB_USER_LEN] = {0};
45,285,421✔
857
    int32_t ret = 0;
45,285,421✔
858
    if ((ret = mndGetAuditUser(pMnode, auditUser)) != 0) {
45,285,421✔
859
      mTrace("dnode:%d, failed to get audit user since %s", pDnode->id, tstrerror(ret));
45,284,431✔
860
    } else {
861
      mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser);
990✔
862
      int32_t ret = 0;
990✔
863
      if ((ret = mndGetUserActiveToken("audit", auditToken)) != 0) {
990✔
NEW
864
        mTrace("dnode:%d, failed to get audit user active token, token:xxxx, since %s", pDnode->id, tstrerror(ret));
×
865
      } else {
866
        mTrace("dnode:%d, get audit user active token:xxxx", pDnode->id);
990✔
867
        if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditInfoChanged = true;
990✔
868
      }
869
    }
870
  }
871

872
  SEpSet  auditVnodeEpSet = {0};
45,286,765✔
873
  int32_t auditVgId = 0;
45,286,765✔
874
  if (tsAuditSaveInSelf) {
45,286,765✔
875
    if (pDb != NULL) {
1,344✔
876
      void   *pIter = NULL;
1,088✔
877
      SVgObj *pVgroup = NULL;
1,088✔
878
      while (1) {
879
        pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
3,264✔
880
        if (pIter == NULL) break;
3,264✔
881

882
        if (mndVgroupInDb(pVgroup, pDb->uid)) {
3,200✔
883
          auditVnodeEpSet = mndGetVgroupEpset(pMnode, pVgroup);
1,024✔
884
          auditVgId = pVgroup->vgId;
1,024✔
885
          sdbCancelFetch(pMnode->pSdb, pIter);
1,024✔
886
          sdbRelease(pMnode->pSdb, pVgroup);
1,024✔
887
          break;
1,024✔
888
        }
889
        sdbRelease(pMnode->pSdb, pVgroup);
2,176✔
890
      }
891
    }
892

893
    if (auditVnodeEpSet.numOfEps != statusReq.auditEpSet.numOfEps) {
1,344✔
894
      auditInfoChanged = true;
64✔
895
      mTrace("dnode:%d, audit epset num changed, auditNum:%d, inReq:%d", pDnode->id, auditVnodeEpSet.numOfEps,
64✔
896
             statusReq.auditEpSet.numOfEps);
897
    } else {
898
      for (int32_t i = 0; i < auditVnodeEpSet.numOfEps; i++) {
2,240✔
899
        if (strncmp(auditVnodeEpSet.eps[i].fqdn, statusReq.auditEpSet.eps[i].fqdn, TSDB_FQDN_LEN) != 0 ||
960✔
900
            auditVnodeEpSet.eps[i].port != statusReq.auditEpSet.eps[i].port) {
960✔
901
          // do not need to check InUse here, because inUse is not accurate at every time
NEW
902
          auditInfoChanged = true;
×
NEW
903
          mTrace("dnode:%d, audit epset changed at item:%d, fqdn:%s:%d:, inReq:%s:%d", pDnode->id, i,
×
904
                 auditVnodeEpSet.eps[i].fqdn, auditVnodeEpSet.eps[i].port, statusReq.auditEpSet.eps[i].fqdn,
905
                 statusReq.auditEpSet.eps[i].port);
NEW
906
          break;
×
907
        }
908
      }
909
    }
910

911
    if (auditVgId != statusReq.auditVgId) {
1,344✔
912
      auditInfoChanged = true;
64✔
913
      mTrace("dnode:%d, audit vgId changed, auditVgId:%d, inReq:%d", pDnode->id, auditVgId, statusReq.auditVgId);
64✔
914
    }
915
  }
916

917
  if (pDb != NULL) {
45,286,765✔
918
    mndReleaseDb(pMnode, pDb);
2,144✔
919
  }
920

921
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
44,642,287✔
922
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
43,236,302✔
923
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditInfoChanged;
89,929,052✔
924
  const STraceId *trace = &pReq->info.traceId;
45,286,765✔
925
  char            timestamp[TD_TIME_STR_LEN] = {0};
45,286,765✔
926
  if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
45,286,765✔
927
  mGTrace(
45,286,765✔
928
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
929
      "timestamp:%s",
930
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
931

932
  if (reboot) {
45,286,765✔
933
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
663,202✔
934
  }
935

936
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
45,286,765✔
937
  if (labs(delta) >= tsTimestampDeltaLimit) {
45,286,765✔
UNCOV
938
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
UNCOV
939
    code = terrno;
×
940

UNCOV
941
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
UNCOV
942
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
943
           tstrerror(code), tsTimestampDeltaLimit);
NEW
944
    TAOS_CHECK_GOTO(code, &lino, _OVER);
×
945
  }
946
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
172,162,012✔
947
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
126,875,247✔
948

949
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
126,875,247✔
950
    if (pVgroup != NULL) {
126,875,247✔
951
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
126,302,559✔
952
        pVgroup->cacheUsage = pVload->cacheUsage;
99,674,541✔
953
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
99,674,541✔
954
        pVgroup->numOfTables = pVload->numOfTables;
99,674,541✔
955
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
99,674,541✔
956
        pVgroup->totalStorage = pVload->totalStorage;
99,674,541✔
957
        pVgroup->compStorage = pVload->compStorage;
99,674,541✔
958
        pVgroup->pointsWritten = pVload->pointsWritten;
99,674,541✔
959
      }
960
      bool stateChanged = false;
126,302,559✔
961
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
163,257,451✔
962
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
163,205,061✔
963
        if (pGid->dnodeId == statusReq.dnodeId) {
163,205,061✔
964
          if (pVload->startTimeMs == 0) {
126,250,169✔
UNCOV
965
            pVload->startTimeMs = statusReq.rebootTime;
×
966
          }
967
          if (pVload->roleTimeMs == 0) {
126,250,169✔
UNCOV
968
            pVload->roleTimeMs = statusReq.rebootTime;
×
969
          }
970
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
126,250,169✔
971
          break;
126,250,169✔
972
        }
973
      }
974
      if (stateChanged) {
126,302,559✔
975
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,294,144✔
976
        if (pDb != NULL && pDb->stateTs != curMs) {
5,294,144✔
977
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,670,312✔
978
                pDb->stateTs, curMs);
979
          pDb->stateTs = curMs;
3,670,312✔
980
        }
981
        mndReleaseDb(pMnode, pDb);
5,294,144✔
982
      }
983
    }
984

985
    mndReleaseVgroup(pMnode, pVgroup);
126,875,247✔
986
  }
987

988
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
45,286,765✔
989
  if (pObj != NULL) {
45,286,765✔
990
    if (statusReq.mload.roleTimeMs == 0) {
29,272,302✔
991
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
250,099✔
992
    }
993
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
29,272,302✔
994
    mndReleaseMnode(pMnode, pObj);
29,272,302✔
995
  }
996

997
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
45,286,765✔
998
  if (pQnode != NULL) {
45,286,765✔
999
    pQnode->load = statusReq.qload;
271,867✔
1000
    mndReleaseQnode(pMnode, pQnode);
271,867✔
1001
  }
1002

1003
  if (needCheck) {
45,286,765✔
1004
    if (statusReq.sver != tsVersion) {
2,050,881✔
UNCOV
1005
      if (pDnode != NULL) {
×
UNCOV
1006
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
1007
      }
UNCOV
1008
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
NEW
1009
      code = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
NEW
1010
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1011
    }
1012

1013
    if (statusReq.dnodeId == 0) {
2,050,881✔
1014
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
621,754✔
1015
    } else {
1016
      if (statusReq.clusterId != pMnode->clusterId) {
1,429,127✔
UNCOV
1017
        if (pDnode != NULL) {
×
UNCOV
1018
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
1019
        }
UNCOV
1020
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
1021
               pMnode->clusterId);
NEW
1022
        code = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
NEW
1023
        TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1024
      }
1025
    }
1026

1027
    // Verify whether the cluster parameters are consistent when status change from offline to ready
1028
    // pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
1029
    // if (pDnode->offlineReason != 0) {
1030
    //   mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
1031
    //   if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
1032
    //   goto _OVER;
1033
    // }
1034

1035
    if (!online) {
2,050,881✔
1036
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
644,478✔
1037
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
1038
    } else {
1039
      mInfo("dnode:%d, do check in status req, online:%d dnodeVer:%" PRId64 ":%" PRId64
1,406,403✔
1040
            " reboot:%d, dnodeChanged:%d supportVnodesChanged:%d analVerChanged:%d encryptKeyChanged:%d "
1041
            "enableWhiteListChanged:%d auditDBChanged:%d auditInfoChanged:%d pMnode->ipWhiteVer:%" PRId64
1042
            " statusReq.ipWhiteVer:%" PRId64 " pMnode->timeWhiteVer:%" PRId64 " statusReq.timeWhiteVer:%" PRId64,
1043
            pDnode->id, online, statusReq.dnodeVer, dnodeVer, reboot, dnodeChanged, supportVnodesChanged,
1044
            analVerChanged, encryptKeyChanged, enableWhiteListChanged, auditDBChanged, auditInfoChanged,
1045
            pMnode->ipWhiteVer, statusReq.ipWhiteVer, pMnode->timeWhiteVer, statusReq.timeWhiteVer);
1046
    }
1047

1048
    pDnode->rebootTime = statusReq.rebootTime;
2,050,881✔
1049
    pDnode->numOfCores = statusReq.numOfCores;
2,050,881✔
1050
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,050,881✔
1051
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,050,881✔
1052
    pDnode->memAvail = statusReq.memAvail;
2,050,881✔
1053
    pDnode->memTotal = statusReq.memTotal;
2,050,881✔
1054
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,050,881✔
1055
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,050,881✔
1056
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,050,881✔
1057
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
711,838✔
1058
      if ((code = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
711,838✔
NEW
1059
        TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1060
      }
1061
    }
1062

1063
    SStatusRsp statusRsp = {0};
2,050,881✔
1064
    statusRsp.statusSeq++;
2,050,881✔
1065
    statusRsp.analVer = analVer;
2,050,881✔
1066
    statusRsp.dnodeVer = dnodeVer;
2,050,881✔
1067
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,050,881✔
1068
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,050,881✔
1069
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,050,881✔
1070
    if (statusRsp.pDnodeEps == NULL) {
2,050,881✔
NEW
1071
      code = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
1072
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1073
    }
1074

1075
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,050,881✔
1076
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,050,881✔
1077
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,050,881✔
1078

1079
    if (auditInfoChanged || auditDBChanged) {
2,050,881✔
1080
      if (tsAuditUseToken) {
196✔
1081
        if (auditDB[0] != '\0') {
132✔
1082
          mInfo("dnode:%d, set audit db:%s in process status rsp", statusReq.dnodeId, auditDB);
132✔
1083
          tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
132✔
1084
        }
1085
        if (auditToken[0] != '\0') {
132✔
1086
          mInfo("dnode:%d, set audit token:xxxx in process status rsp", statusReq.dnodeId);
66✔
1087
          tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN);
66✔
1088
        }
1089
      }
1090

1091
      if (tsAuditSaveInSelf) {
196✔
1092
        mInfo("dnode:%d, set audit epset and vgId:%d in process status rsp", statusReq.dnodeId, auditVgId);
64✔
1093
        statusRsp.auditEpSet = auditVnodeEpSet;
64✔
1094
        statusRsp.auditVgId = auditVgId;
64✔
1095
      }
1096
    }
1097

1098
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,050,881✔
1099
    void   *pHead = rpcMallocCont(contLen);
2,050,881✔
1100
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,050,881✔
1101
    taosArrayDestroy(statusRsp.pDnodeEps);
2,050,881✔
1102
    if (contLen < 0) {
2,050,881✔
UNCOV
1103
      code = contLen;
×
NEW
1104
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1105
    }
1106

1107
    pReq->info.rspLen = contLen;
2,050,881✔
1108
    pReq->info.rsp = pHead;
2,050,881✔
1109
  }
1110

1111
  pDnode->accessTimes++;
45,286,765✔
1112
  pDnode->lastAccessTime = curMs;
45,286,765✔
1113
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
45,286,765✔
1114
    pDnode->offlineReason = DND_REASON_ONLINE;
643,740✔
1115
  }
1116
  code = 0;
45,286,765✔
1117

1118
_OVER:
45,684,056✔
1119
  mndReleaseDnode(pMnode, pDnode);
45,684,056✔
1120
  taosArrayDestroy(statusReq.pVloads);
45,684,056✔
1121
  if (code != 0) {
45,684,056✔
1122
    mError("dnode:%d, failed to process status req at line:%d since %s", statusReq.dnodeId, lino, tstrerror(code));
397,291✔
1123
    return code;
397,291✔
1124
  }
1125

1126
  return mndUpdClusterInfo(pReq);
45,286,765✔
1127
}
1128

UNCOV
1129
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
UNCOV
1130
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
1131
  SNotifyReq notifyReq = {0};
×
1132
  int32_t    code = 0;
×
1133

UNCOV
1134
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
UNCOV
1135
    terrno = code;
×
UNCOV
1136
    goto _OVER;
×
1137
  }
1138

UNCOV
1139
  int64_t clusterid = mndGetClusterId(pMnode);
×
UNCOV
1140
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
UNCOV
1141
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
UNCOV
1142
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1143
          notifyReq.clusterId, clusterid, tstrerror(code));
UNCOV
1144
    goto _OVER;
×
1145
  }
1146

UNCOV
1147
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
UNCOV
1148
  for (int32_t v = 0; v < nVgroup; ++v) {
×
UNCOV
1149
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1150

UNCOV
1151
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
UNCOV
1152
    if (pVgroup != NULL) {
×
UNCOV
1153
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
UNCOV
1154
      mndReleaseVgroup(pMnode, pVgroup);
×
1155
    }
1156
  }
UNCOV
1157
  code = mndUpdClusterInfo(pReq);
×
UNCOV
1158
_OVER:
×
UNCOV
1159
  tFreeSNotifyReq(&notifyReq);
×
UNCOV
1160
  return code;
×
1161
}
1162

1163
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
151,602✔
1164
  int32_t  code = -1;
151,602✔
1165
  SSdbRaw *pRaw = NULL;
151,602✔
1166
  STrans  *pTrans = NULL;
151,602✔
1167

1168
  SDnodeObj dnodeObj = {0};
151,602✔
1169
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
151,602✔
1170
  dnodeObj.createdTime = taosGetTimestampMs();
151,602✔
1171
  dnodeObj.updateTime = dnodeObj.createdTime;
151,602✔
1172
  dnodeObj.port = pCreate->port;
151,602✔
1173
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
151,602✔
1174
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
151,602✔
1175

1176
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
151,602✔
1177
  if (pTrans == NULL) {
151,602✔
UNCOV
1178
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1179
    if (terrno != 0) code = terrno;
×
UNCOV
1180
    goto _OVER;
×
1181
  }
1182
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
151,602✔
1183
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
151,602✔
1184

1185
  pRaw = mndDnodeActionEncode(&dnodeObj);
151,602✔
1186
  if (pRaw == NULL) {
151,602✔
UNCOV
1187
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1188
    if (terrno != 0) code = terrno;
×
UNCOV
1189
    goto _OVER;
×
1190
  }
1191
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
151,602✔
1192
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
151,602✔
1193
  pRaw = NULL;
151,602✔
1194

1195
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
151,602✔
1196
  code = 0;
151,602✔
1197

1198
_OVER:
151,602✔
1199
  mndTransDrop(pTrans);
151,602✔
1200
  sdbFreeRaw(pRaw);
151,602✔
1201
  return code;
151,602✔
1202
}
1203

1204
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
27,416✔
1205
  SMnode       *pMnode = pReq->info.node;
27,416✔
1206
  SSdb         *pSdb = pMnode->pSdb;
27,416✔
1207
  SDnodeObj    *pObj = NULL;
27,416✔
1208
  void         *pIter = NULL;
27,416✔
1209
  SDnodeListRsp rsp = {0};
27,416✔
1210
  int32_t       code = -1;
27,416✔
1211

1212
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
27,416✔
1213
  if (NULL == rsp.dnodeList) {
27,416✔
1214
    mError("failed to alloc epSet while process dnode list req");
×
1215
    code = terrno;
×
UNCOV
1216
    goto _OVER;
×
1217
  }
1218

1219
  while (1) {
52,422✔
1220
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
79,838✔
1221
    if (pIter == NULL) break;
79,838✔
1222

1223
    SDNodeAddr dnodeAddr = {0};
52,422✔
1224
    dnodeAddr.nodeId = pObj->id;
52,422✔
1225
    dnodeAddr.epSet.numOfEps = 1;
52,422✔
1226
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
52,422✔
1227
    dnodeAddr.epSet.eps[0].port = pObj->port;
52,422✔
1228

1229
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
104,844✔
UNCOV
1230
      if (terrno != 0) code = terrno;
×
UNCOV
1231
      sdbRelease(pSdb, pObj);
×
UNCOV
1232
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1233
      goto _OVER;
×
1234
    }
1235

1236
    sdbRelease(pSdb, pObj);
52,422✔
1237
  }
1238

1239
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
27,416✔
1240
  void   *pRsp = rpcMallocCont(rspLen);
27,416✔
1241
  if (pRsp == NULL) {
27,416✔
UNCOV
1242
    code = terrno;
×
UNCOV
1243
    goto _OVER;
×
1244
  }
1245

1246
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
27,416✔
UNCOV
1247
    code = rspLen;
×
1248
    goto _OVER;
×
1249
  }
1250

1251
  pReq->info.rspLen = rspLen;
27,416✔
1252
  pReq->info.rsp = pRsp;
27,416✔
1253
  code = 0;
27,416✔
1254

1255
_OVER:
27,416✔
1256

1257
  if (code != 0) {
27,416✔
UNCOV
1258
    mError("failed to get dnode list since %s", tstrerror(code));
×
1259
  }
1260

1261
  tFreeSDnodeListRsp(&rsp);
27,416✔
1262

1263
  TAOS_RETURN(code);
27,416✔
1264
}
1265

1266
void getSlowLogScopeString(int32_t scope, char *result) {
1,183✔
1267
  if (scope == SLOW_LOG_TYPE_NULL) {
1,183✔
UNCOV
1268
    (void)strncat(result, "NONE", 64);
×
UNCOV
1269
    return;
×
1270
  }
1271
  while (scope > 0) {
2,366✔
1272
    if (scope & SLOW_LOG_TYPE_QUERY) {
1,183✔
1273
      (void)strncat(result, "QUERY", 64);
1,183✔
1274
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,183✔
1275
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1276
      (void)strncat(result, "INSERT", 64);
×
1277
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
UNCOV
1278
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
UNCOV
1279
      (void)strncat(result, "OTHERS", 64);
×
UNCOV
1280
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1281
    } else {
UNCOV
1282
      (void)printf("invalid slow log scope:%d", scope);
×
UNCOV
1283
      return;
×
1284
    }
1285

1286
    if (scope > 0) {
1,183✔
UNCOV
1287
      (void)strncat(result, "|", 64);
×
1288
    }
1289
  }
1290
}
1291

1292
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
151,602✔
1293
  SMnode         *pMnode = pReq->info.node;
151,602✔
1294
  int32_t         code = -1;
151,602✔
1295
  SDnodeObj      *pDnode = NULL;
151,602✔
1296
  SCreateDnodeReq createReq = {0};
151,602✔
1297
  int32_t         lino = 0;
151,602✔
1298
  int64_t         tss = taosGetTimestampMs();
151,602✔
1299

1300
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
151,602✔
UNCOV
1301
    goto _OVER;
×
1302
  }
1303

1304
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
151,602✔
1305
  TAOS_CHECK_GOTO(code, &lino, _OVER);
151,602✔
1306

1307
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
151,602✔
1308
  code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_DNODE);
151,602✔
1309
  TAOS_CHECK_GOTO(code, &lino, _OVER);
151,602✔
1310

1311
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
151,602✔
UNCOV
1312
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
UNCOV
1313
    goto _OVER;
×
1314
  }
1315
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1316
  // if (code != 0) {
1317
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1318
  //          tstrerror(code));
1319
  //   goto _OVER;
1320
  // }
1321

1322
  char ep[TSDB_EP_LEN];
151,602✔
1323
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
151,602✔
1324
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
151,602✔
1325
  if (pDnode != NULL) {
151,602✔
UNCOV
1326
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
UNCOV
1327
    goto _OVER;
×
1328
  }
1329

1330
  code = mndCreateDnode(pMnode, pReq, &createReq);
151,602✔
1331
  if (code == 0) {
151,602✔
1332
    code = TSDB_CODE_ACTION_IN_PROGRESS;
151,602✔
1333
    tsGrantHBInterval = 5;
151,602✔
1334
  }
1335

1336
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
151,602✔
1337
    char obj[200] = {0};
151,602✔
1338
    (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
151,602✔
1339

1340
    int64_t tse = taosGetTimestampMs();
151,602✔
1341
    double  duration = (double)(tse - tss);
151,602✔
1342
    duration = duration / 1000;
151,602✔
1343
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
151,602✔
1344
  }
1345

1346
_OVER:
151,602✔
1347
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
151,602✔
1348
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1349
  }
1350

1351
  mndReleaseDnode(pMnode, pDnode);
151,602✔
1352
  tFreeSCreateDnodeReq(&createReq);
151,602✔
1353
  TAOS_RETURN(code);
151,602✔
1354
}
1355

1356
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1357

1358
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
1,653✔
1359

1360
#ifndef TD_ENTERPRISE
1361
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1362
#endif
1363

1364
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
9,516✔
1365
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1366
  int32_t  code = -1;
9,516✔
1367
  SSdbRaw *pRaw = NULL;
9,516✔
1368
  STrans  *pTrans = NULL;
9,516✔
1369
  int32_t  lino = 0;
9,516✔
1370

1371
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
9,516✔
1372
  if (pTrans == NULL) {
9,516✔
1373
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1374
    if (terrno != 0) code = terrno;
×
UNCOV
1375
    goto _OVER;
×
1376
  }
1377
  mndTransSetGroupParallel(pTrans);
9,516✔
1378
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
9,516✔
1379
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
9,516✔
1380

1381
  pRaw = mndDnodeActionEncode(pDnode);
9,516✔
1382
  if (pRaw == NULL) {
9,516✔
UNCOV
1383
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1384
    if (terrno != 0) code = terrno;
×
UNCOV
1385
    goto _OVER;
×
1386
  }
1387
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
9,516✔
1388
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
9,516✔
1389
  pRaw = NULL;
9,516✔
1390

1391
  pRaw = mndDnodeActionEncode(pDnode);
9,516✔
1392
  if (pRaw == NULL) {
9,516✔
UNCOV
1393
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1394
    if (terrno != 0) code = terrno;
×
UNCOV
1395
    goto _OVER;
×
1396
  }
1397
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
9,516✔
1398
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
9,516✔
1399
  pRaw = NULL;
9,516✔
1400

1401
  if (pSObj != NULL) {
9,516✔
1402
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
377✔
1403
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
377✔
1404
  }
1405

1406
  if (pMObj != NULL) {
9,516✔
1407
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
211✔
1408
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
211✔
1409
  }
1410

1411
  if (pQObj != NULL) {
9,516✔
1412
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
144✔
1413
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
144✔
1414
  }
1415

1416
  if (pBObj != NULL) {
9,516✔
1417
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
787✔
1418
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
787✔
1419
  }
1420

1421
  if (numOfVnodes > 0) {
8,729✔
1422
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
7,271✔
1423
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
7,271✔
1424
  }
1425

1426
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
8,729✔
1427

1428
  code = 0;
8,729✔
1429

1430
_OVER:
9,516✔
1431
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
9,516✔
1432
  mndTransDrop(pTrans);
9,516✔
1433
  sdbFreeRaw(pRaw);
9,516✔
1434
  TAOS_RETURN(code);
9,516✔
1435
}
1436

UNCOV
1437
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
UNCOV
1438
  bool       isEmpty = false;
×
UNCOV
1439
  SMnodeObj *pMObj = NULL;
×
UNCOV
1440
  SQnodeObj *pQObj = NULL;
×
UNCOV
1441
  SSnodeObj *pSObj = NULL;
×
1442

UNCOV
1443
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
1444
  if (pQObj) goto _OVER;
×
1445

1446
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
UNCOV
1447
  if (pSObj) goto _OVER;
×
1448

UNCOV
1449
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
UNCOV
1450
  if (pMObj) goto _OVER;
×
1451

UNCOV
1452
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
UNCOV
1453
  if (numOfVnodes > 0) goto _OVER;
×
1454

1455
  isEmpty = true;
×
1456
_OVER:
×
UNCOV
1457
  mndReleaseMnode(pMnode, pMObj);
×
UNCOV
1458
  mndReleaseQnode(pMnode, pQObj);
×
UNCOV
1459
  mndReleaseSnode(pMnode, pSObj);
×
UNCOV
1460
  return isEmpty;
×
1461
}
1462

1463
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
10,389✔
1464
  SMnode       *pMnode = pReq->info.node;
10,389✔
1465
  int32_t       code = -1;
10,389✔
1466
  SDnodeObj    *pDnode = NULL;
10,389✔
1467
  SMnodeObj    *pMObj = NULL;
10,389✔
1468
  SQnodeObj    *pQObj = NULL;
10,389✔
1469
  SSnodeObj    *pSObj = NULL;
10,389✔
1470
  SBnodeObj    *pBObj = NULL;
10,389✔
1471
  SDropDnodeReq dropReq = {0};
10,389✔
1472
  int64_t       tss = taosGetTimestampMs();
10,389✔
1473

1474
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
10,389✔
1475

1476
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
10,389✔
1477
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1478
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MNODE), NULL, _OVER);
10,389✔
1479

1480
  bool force = dropReq.force;
10,389✔
1481
  if (dropReq.unsafe) {
10,389✔
UNCOV
1482
    force = true;
×
1483
  }
1484

1485
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
10,389✔
1486
  if (pDnode == NULL) {
10,389✔
UNCOV
1487
    int32_t err = terrno;
×
UNCOV
1488
    char    ep[TSDB_EP_LEN + 1] = {0};
×
UNCOV
1489
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
UNCOV
1490
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
UNCOV
1491
    if (pDnode == NULL) {
×
UNCOV
1492
      code = err;
×
UNCOV
1493
      goto _OVER;
×
1494
    }
1495
  }
1496

1497
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
10,389✔
1498
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
10,389✔
1499
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
10,389✔
1500
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
10,389✔
1501
  if (pMObj != NULL) {
10,389✔
1502
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
821✔
1503
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
399✔
1504
      goto _OVER;
399✔
1505
    }
1506
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
422✔
1507
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
211✔
1508
      goto _OVER;
211✔
1509
    }
1510
  }
1511

1512
#ifdef USE_MOUNT
1513
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
9,779✔
1514
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
UNCOV
1515
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
1516
    goto _OVER;
×
1517
  }
1518
#endif
1519

1520
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
9,779✔
1521
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
9,779✔
1522

1523
  if (isonline && force) {
9,779✔
UNCOV
1524
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
UNCOV
1525
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1526
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
UNCOV
1527
    goto _OVER;
×
1528
  }
1529

1530
  mError("vnode num:%d", numOfVnodes);
9,779✔
1531

1532
  bool    vnodeOffline = false;
9,779✔
1533
  void   *pIter = NULL;
9,779✔
1534
  int32_t vgId = -1;
9,779✔
1535
  while (1) {
21,711✔
1536
    SVgObj *pVgroup = NULL;
31,490✔
1537
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
31,490✔
1538
    if (pIter == NULL) break;
31,490✔
1539

1540
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
65,568✔
1541
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
43,857✔
1542
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
43,857✔
1543
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
14,536✔
UNCOV
1544
          vgId = pVgroup->vgId;
×
UNCOV
1545
          vnodeOffline = true;
×
UNCOV
1546
          break;
×
1547
        }
1548
      }
1549
    }
1550

1551
    sdbRelease(pMnode->pSdb, pVgroup);
21,711✔
1552

1553
    if (vnodeOffline) {
21,711✔
1554
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
1555
      break;
×
1556
    }
1557
  }
1558

1559
  if (vnodeOffline && !force) {
9,779✔
UNCOV
1560
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
UNCOV
1561
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1562
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
UNCOV
1563
    goto _OVER;
×
1564
  }
1565

1566
  if (!isonline && !force) {
9,779✔
1567
    code = TSDB_CODE_DNODE_OFFLINE;
263✔
1568
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
263✔
1569
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
1570
    goto _OVER;
263✔
1571
  }
1572

1573
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
9,516✔
1574
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
9,516✔
1575

1576
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
9,516✔
1577
    char obj1[30] = {0};
9,516✔
1578
    (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
9,516✔
1579

1580
    int64_t tse = taosGetTimestampMs();
9,516✔
1581
    double  duration = (double)(tse - tss);
9,516✔
1582
    duration = duration / 1000;
9,516✔
1583
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
9,516✔
1584
  }
1585

1586
_OVER:
10,389✔
1587
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10,389✔
1588
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
1,660✔
1589
  }
1590

1591
  mndReleaseDnode(pMnode, pDnode);
10,389✔
1592
  mndReleaseMnode(pMnode, pMObj);
10,389✔
1593
  mndReleaseQnode(pMnode, pQObj);
10,389✔
1594
  mndReleaseBnode(pMnode, pBObj);
10,389✔
1595
  mndReleaseSnode(pMnode, pSObj);
10,389✔
1596
  tFreeSDropDnodeReq(&dropReq);
10,389✔
1597
  TAOS_RETURN(code);
10,389✔
1598
}
1599

1600
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
175✔
1601
  int32_t code = 0;
175✔
1602
  SMnode *pMnode = pReq->info.node;
175✔
1603
  SSdb   *pSdb = pMnode->pSdb;
175✔
1604
  void   *pIter = NULL;
175✔
1605
  int8_t  encrypting = 0;
175✔
1606

1607
  const STraceId *trace = &pReq->info.traceId;
175✔
1608

1609
  int32_t klen = strlen(pDcfgReq->value);
175✔
1610
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
175✔
UNCOV
1611
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
UNCOV
1612
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1613
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
UNCOV
1614
    goto _exit;
×
1615
  }
1616

1617
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
175✔
UNCOV
1618
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
UNCOV
1619
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
UNCOV
1620
    goto _exit;
×
1621
  }
1622

1623
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
175✔
1624
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
UNCOV
1625
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
UNCOV
1626
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1627
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
UNCOV
1628
    goto _exit;
×
1629
  }
1630

1631
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
175✔
1632
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
175✔
1633
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
175✔
1634

1635
  while (1) {
175✔
1636
    SDnodeObj *pDnode = NULL;
350✔
1637
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
350✔
1638
    if (pIter == NULL) break;
350✔
1639
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
175✔
UNCOV
1640
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1641
             offlineReason[pDnode->offlineReason]);
UNCOV
1642
      sdbRelease(pSdb, pDnode);
×
UNCOV
1643
      continue;
×
1644
    }
1645

1646
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
175✔
1647
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
175✔
1648
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
175✔
1649
      void   *pBuf = rpcMallocCont(bufLen);
175✔
1650

1651
      if (pBuf != NULL) {
175✔
1652
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
175✔
UNCOV
1653
          code = bufLen;
×
UNCOV
1654
          sdbRelease(pSdb, pDnode);
×
UNCOV
1655
          goto _exit;
×
1656
        }
1657
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
175✔
1658
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
175✔
1659
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
175✔
1660
        }
1661
      }
1662
    }
1663

1664
    sdbRelease(pSdb, pDnode);
175✔
1665
  }
1666

1667
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
175✔
UNCOV
1668
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1669
  }
1670

1671
_exit:
175✔
1672
  if (code != 0) {
175✔
1673
    if (terrno == 0) terrno = code;
×
1674
  }
1675
  return code;
175✔
1676
}
1677

1678
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
175✔
1679
  int32_t code = 0;
175✔
1680

1681
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1682
  SMnode       *pMnode = pReq->info.node;
175✔
1683
  SMCfgDnodeReq cfgReq = {0};
175✔
1684
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
175✔
1685

1686
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE)) != 0) {
175✔
1687
    tFreeSMCfgDnodeReq(&cfgReq);
×
UNCOV
1688
    TAOS_RETURN(code);
×
1689
  }
1690
  const STraceId *trace = &pReq->info.traceId;
175✔
1691
  SDCfgDnodeReq   dcfgReq = {0};
175✔
1692
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
175✔
1693
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
175✔
1694
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
175✔
1695
    tFreeSMCfgDnodeReq(&cfgReq);
175✔
1696
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
175✔
1697
  } else {
UNCOV
1698
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
UNCOV
1699
    tFreeSMCfgDnodeReq(&cfgReq);
×
UNCOV
1700
    TAOS_RETURN(code);
×
1701
  }
1702

1703
#else
1704
  TAOS_RETURN(code);
1705
#endif
1706
}
1707

1708
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
175✔
1709
  SMnode *pMnode = pRsp->info.node;
175✔
1710
  int16_t nSuccess = 0;
175✔
1711
  int16_t nFailed = 0;
175✔
1712

1713
  if (0 == pRsp->code) {
175✔
1714
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
175✔
1715
  } else {
1716
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1717
  }
1718

1719
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
175✔
1720
  bool    finished = nSuccess + nFailed >= nReq;
175✔
1721

1722
  if (finished) {
175✔
1723
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
175✔
1724
  }
1725

1726
  const STraceId *trace = &pRsp->info.traceId;
175✔
1727
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
175✔
1728
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1729

1730
  return 0;
175✔
1731
}
1732

1733
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,183✔
1734
  SMnode *pMnode = pReq->info.node;
1,183✔
1735
  int32_t totalRows = 0;
1,183✔
1736
  int32_t numOfRows = 0;
1,183✔
1737
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,183✔
1738
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,183✔
1739
  char   *pWrite = NULL;
1,183✔
1740
  int32_t cols = 0;
1,183✔
1741
  int32_t code = 0;
1,183✔
1742
  int32_t lino = 0;
1,183✔
1743

1744
  cfgOpts[totalRows] = "statusIntervalMs";
1,183✔
1745
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
1,183✔
1746
  totalRows++;
1,183✔
1747

1748
  cfgOpts[totalRows] = "timezone";
1,183✔
1749
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,183✔
1750
  totalRows++;
1,183✔
1751

1752
  cfgOpts[totalRows] = "locale";
1,183✔
1753
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,183✔
1754
  totalRows++;
1,183✔
1755

1756
  cfgOpts[totalRows] = "charset";
1,183✔
1757
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,183✔
1758
  totalRows++;
1,183✔
1759

1760
  cfgOpts[totalRows] = "monitor";
1,183✔
1761
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,183✔
1762
  totalRows++;
1,183✔
1763

1764
  cfgOpts[totalRows] = "monitorInterval";
1,183✔
1765
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,183✔
1766
  totalRows++;
1,183✔
1767

1768
  cfgOpts[totalRows] = "slowLogThreshold";
1,183✔
1769
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,183✔
1770
  totalRows++;
1,183✔
1771

1772
  cfgOpts[totalRows] = "slowLogMaxLen";
1,183✔
1773
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,183✔
1774
  totalRows++;
1,183✔
1775

1776
  char scopeStr[64] = {0};
1,183✔
1777
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,183✔
1778
  cfgOpts[totalRows] = "slowLogScope";
1,183✔
1779
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,183✔
1780
  totalRows++;
1,183✔
1781

1782
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,183✔
1783
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,183✔
1784

1785
  for (int32_t i = 0; i < totalRows; i++) {
11,830✔
1786
    cols = 0;
10,647✔
1787

1788
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
10,647✔
1789
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,647✔
1790
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
10,647✔
1791

1792
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
10,647✔
1793
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,647✔
1794
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
10,647✔
1795

1796
    numOfRows++;
10,647✔
1797
  }
1798

1799
_OVER:
1,183✔
1800
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,183✔
1801
  pShow->numOfRows += numOfRows;
1,183✔
1802
  return numOfRows;
1,183✔
1803
}
1804

UNCOV
1805
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1806

1807
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
754,765✔
1808
  SMnode    *pMnode = pReq->info.node;
754,765✔
1809
  SSdb      *pSdb = pMnode->pSdb;
754,765✔
1810
  int32_t    numOfRows = 0;
754,765✔
1811
  int32_t    cols = 0;
754,765✔
1812
  ESdbStatus objStatus = 0;
754,765✔
1813
  SDnodeObj *pDnode = NULL;
754,765✔
1814
  int64_t    curMs = taosGetTimestampMs();
754,765✔
1815
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
754,608✔
1816
  int32_t    code = 0;
754,765✔
1817
  int32_t    lino = 0;
754,765✔
1818

1819
  while (numOfRows < rows) {
2,651,793✔
1820
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
2,651,793✔
1821
    if (pShow->pIter == NULL) break;
2,651,793✔
1822
    bool online = mndIsDnodeOnline(pDnode, curMs);
1,897,028✔
1823

1824
    cols = 0;
1,897,028✔
1825

1826
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,897,028✔
1827
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
1,897,028✔
1828

1829
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
1,897,028✔
1830

1831
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,897,028✔
1832
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,897,028✔
1833

1834
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,897,028✔
1835
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
1,897,028✔
1836
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
1,897,028✔
1837

1838
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,897,028✔
1839
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
1,897,028✔
1840
                        &lino, _OVER);
1841

1842
    const char *status = "ready";
1,897,028✔
1843
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
1,897,028✔
1844
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
1,897,028✔
1845
    if (!online) {
1,897,028✔
1846
      if (objStatus == SDB_STATUS_CREATING)
207,573✔
UNCOV
1847
        status = "creating*";
×
1848
      else if (objStatus == SDB_STATUS_DROPPING)
207,573✔
UNCOV
1849
        status = "dropping*";
×
1850
      else
1851
        status = "offline";
207,573✔
1852
    }
1853

1854
    STR_TO_VARSTR(buf, status);
1,897,028✔
1855
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,897,028✔
1856
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,897,028✔
1857

1858
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,897,028✔
1859
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
1,897,028✔
1860
                        _OVER);
1861

1862
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,897,028✔
1863
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
1,897,028✔
1864
                        _OVER);
1865

1866
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
1,897,028✔
1867
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
1,897,028✔
1868

1869
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,897,028✔
1870
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
1,897,028✔
1871
    taosMemoryFreeClear(b);
1,897,028✔
1872

1873
#ifdef TD_ENTERPRISE
1874
    STR_TO_VARSTR(buf, pDnode->machineId);
1,897,028✔
1875
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,897,028✔
1876
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,897,028✔
1877
#endif
1878

1879
    numOfRows++;
1,897,028✔
1880
    sdbRelease(pSdb, pDnode);
1,897,028✔
1881
  }
1882

1883
_OVER:
754,608✔
1884
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
754,765✔
1885

1886
  pShow->numOfRows += numOfRows;
754,765✔
1887
  return numOfRows;
754,765✔
1888
}
1889

UNCOV
1890
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
UNCOV
1891
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1892
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
UNCOV
1893
}
×
1894

UNCOV
1895
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
UNCOV
1896
  int32_t    code = 0;
×
UNCOV
1897
  SDnodeObj *pObj = NULL;
×
UNCOV
1898
  void      *pIter = NULL;
×
UNCOV
1899
  SSdb      *pSdb = pMnode->pSdb;
×
UNCOV
1900
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
UNCOV
1901
  if (fqdns == NULL) {
×
UNCOV
1902
    mError("failed to init fqdns array");
×
UNCOV
1903
    return NULL;
×
1904
  }
1905

UNCOV
1906
  while (1) {
×
UNCOV
1907
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
1908
    if (pIter == NULL) break;
×
1909

1910
    char *fqdn = taosStrdup(pObj->fqdn);
×
UNCOV
1911
    if (fqdn == NULL) {
×
UNCOV
1912
      sdbRelease(pSdb, pObj);
×
UNCOV
1913
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
1914

UNCOV
1915
      code = terrno;
×
UNCOV
1916
      break;
×
1917
    }
1918

UNCOV
1919
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
UNCOV
1920
      mError("failed to fqdn into array, but continue at this time");
×
1921
    }
UNCOV
1922
    sdbRelease(pSdb, pObj);
×
1923
  }
1924

UNCOV
1925
_error:
×
UNCOV
1926
  if (code != 0) {
×
UNCOV
1927
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
UNCOV
1928
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
UNCOV
1929
      taosMemoryFreeClear(pFqdn);
×
1930
    }
UNCOV
1931
    taosArrayDestroy(fqdns);
×
UNCOV
1932
    fqdns = NULL;
×
1933
  }
1934

UNCOV
1935
  return fqdns;
×
1936
}
1937

1938
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq) {
573,829✔
1939
  SMnode     *pMnode = pReq->info.node;
573,829✔
1940
  SKeySyncReq req = {0};
573,829✔
1941
  SKeySyncRsp rsp = {0};
573,829✔
1942
  int32_t     code = TSDB_CODE_SUCCESS;
573,829✔
1943

1944
  code = tDeserializeSKeySyncReq(pReq->pCont, pReq->contLen, &req);
573,829✔
1945
  if (code != 0) {
573,829✔
UNCOV
1946
    mError("failed to deserialize key sync req, since %s", tstrerror(code));
×
UNCOV
1947
    goto _OVER;
×
1948
  }
1949

1950
  mInfo("received key sync req from dnode:%d, keyVersion:%d", req.dnodeId, req.keyVersion);
573,829✔
1951

1952
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1953
  // Load mnode's encryption keys
1954
  char masterKeyFile[PATH_MAX] = {0};
573,829✔
1955
  snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
573,829✔
1956
           TD_DIRSEP);
1957
  char derivedKeyFile[PATH_MAX] = {0};
573,829✔
1958
  snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
573,829✔
1959
           TD_DIRSEP);
1960
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
573,829✔
1961
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
573,829✔
1962
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
573,829✔
1963
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
573,829✔
1964
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
573,829✔
1965
  int32_t algorithm = 0;
573,829✔
1966
  int32_t cfgAlgorithm = 0;
573,829✔
1967
  int32_t metaAlgorithm = 0;
573,829✔
1968
  int32_t fileVersion = 0;
573,829✔
1969
  int32_t keyVersion = 0;
573,829✔
1970
  int64_t createTime = 0;
573,829✔
1971
  int64_t svrKeyUpdateTime = 0;
573,829✔
1972
  int64_t dbKeyUpdateTime = 0;
573,829✔
1973

1974
  if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
573,829✔
1975
    keyVersion = tsEncryptKeyVersion;
1,534✔
1976
    tstrncpy(svrKey, tsSvrKey, ENCRYPT_KEY_LEN + 1);
1,534✔
1977
    tstrncpy(dbKey, tsDbKey, ENCRYPT_KEY_LEN + 1);
1,534✔
1978
    tstrncpy(cfgKey, tsCfgKey, ENCRYPT_KEY_LEN + 1);
1,534✔
1979
    tstrncpy(metaKey, tsMetaKey, ENCRYPT_KEY_LEN + 1);
1,534✔
1980
    tstrncpy(dataKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
1,534✔
1981
    algorithm = tsEncryptAlgorithmType;
1,534✔
1982
    cfgAlgorithm = tsCfgAlgorithm;
1,534✔
1983
    metaAlgorithm = tsMetaAlgorithm;
1,534✔
1984
    fileVersion = tsEncryptFileVersion;
1,534✔
1985
    createTime = tsEncryptKeyCreateTime;
1,534✔
1986
    svrKeyUpdateTime = tsSvrKeyUpdateTime;
1,534✔
1987
    dbKeyUpdateTime = tsDbKeyUpdateTime;
1,534✔
1988
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_LOADED;
1,534✔
1989
  } else {
1990
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_DISABLED;
572,295✔
1991
  }
1992

1993
  // Check if dnode needs update
1994
  if (req.keyVersion != keyVersion) {
573,829✔
1995
    mInfo("dnode:%d key version mismatch, mnode:%d, dnode:%d, will send keys", req.dnodeId, keyVersion, req.keyVersion);
1,534✔
1996

1997
    rsp.keyVersion = keyVersion;
1,534✔
1998
    rsp.needUpdate = 1;
1,534✔
1999
    tstrncpy(rsp.svrKey, svrKey, sizeof(rsp.svrKey));
1,534✔
2000
    tstrncpy(rsp.dbKey, dbKey, sizeof(rsp.dbKey));
1,534✔
2001
    tstrncpy(rsp.cfgKey, cfgKey, sizeof(rsp.cfgKey));
1,534✔
2002
    tstrncpy(rsp.metaKey, metaKey, sizeof(rsp.metaKey));
1,534✔
2003
    tstrncpy(rsp.dataKey, dataKey, sizeof(rsp.dataKey));
1,534✔
2004
    rsp.algorithm = algorithm;
1,534✔
2005
    rsp.createTime = createTime;
1,534✔
2006
    rsp.svrKeyUpdateTime = svrKeyUpdateTime;
1,534✔
2007
    rsp.dbKeyUpdateTime = dbKeyUpdateTime;
1,534✔
2008
  } else {
2009
    mInfo("dnode:%d key version matches, version:%d", req.dnodeId, keyVersion);
572,295✔
2010
    rsp.keyVersion = keyVersion;
572,295✔
2011
    rsp.needUpdate = 0;
572,295✔
2012
  }
2013
#else
2014
  // Community edition - no encryption support
2015
  mWarn("enterprise features not enabled, key sync not supported");
2016
  rsp.keyVersion = 0;
2017
  rsp.needUpdate = 0;
2018
#endif
2019

2020
  int32_t contLen = tSerializeSKeySyncRsp(NULL, 0, &rsp);
573,829✔
2021
  if (contLen < 0) {
573,829✔
UNCOV
2022
    code = contLen;
×
UNCOV
2023
    goto _OVER;
×
2024
  }
2025

2026
  void *pHead = rpcMallocCont(contLen);
573,829✔
2027
  if (pHead == NULL) {
573,829✔
UNCOV
2028
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2029
    goto _OVER;
×
2030
  }
2031

2032
  contLen = tSerializeSKeySyncRsp(pHead, contLen, &rsp);
573,829✔
2033
  if (contLen < 0) {
573,829✔
UNCOV
2034
    rpcFreeCont(pHead);
×
UNCOV
2035
    code = contLen;
×
UNCOV
2036
    goto _OVER;
×
2037
  }
2038

2039
  pReq->info.rspLen = contLen;
573,829✔
2040
  pReq->info.rsp = pHead;
573,829✔
2041

2042
_OVER:
573,829✔
2043
  if (code != 0) {
573,829✔
UNCOV
2044
    mError("failed to process key sync req, since %s", tstrerror(code));
×
2045
  }
2046
  return code;
573,829✔
2047
}
2048

UNCOV
2049
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq) { return 0; }
×
2050

UNCOV
2051
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
UNCOV
2052
  if (p == NULL) return NULL;
×
2053

UNCOV
2054
  switch (type) {
×
UNCOV
2055
    case SDB_DNODE:
×
UNCOV
2056
      return (SDnodeObj *)p;
×
UNCOV
2057
    case SDB_QNODE:
×
UNCOV
2058
      return ((SQnodeObj *)p)->pDnode;
×
UNCOV
2059
    case SDB_SNODE:
×
UNCOV
2060
      return ((SSnodeObj *)p)->pDnode;
×
UNCOV
2061
    case SDB_BNODE:
×
UNCOV
2062
      return ((SBnodeObj *)p)->pDnode;
×
UNCOV
2063
    default:
×
UNCOV
2064
      break;
×
2065
  }
UNCOV
2066
  return NULL;
×
2067
}
UNCOV
2068
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
UNCOV
2069
  int32_t lino = 0;
×
UNCOV
2070
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2071
  void   *pIter = NULL;
×
UNCOV
2072
  int32_t code = 0;
×
2073

UNCOV
2074
  while (1) {
×
UNCOV
2075
    void *pObj = NULL;
×
UNCOV
2076
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
UNCOV
2077
    if (pIter == NULL) break;
×
2078

UNCOV
2079
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
UNCOV
2080
    if (pDnodeObj == NULL) {
×
UNCOV
2081
      mError("null dnode object for type:%d", type);
×
UNCOV
2082
      sdbRelease(pSdb, pObj);
×
2083
      continue;
×
2084
    }
2085

UNCOV
2086
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
UNCOV
2087
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
UNCOV
2088
      mError("failed to push addr into array");
×
2089
      sdbRelease(pSdb, pObj);
×
2090
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2091
    }
UNCOV
2092
    sdbRelease(pSdb, pObj);
×
2093
  }
2094

2095
_exit:
×
2096
  return code;
×
2097
}
2098

UNCOV
2099
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
UNCOV
2100
  int32_t lino = 0;
×
UNCOV
2101
  int32_t code = 0;
×
UNCOV
2102
  if (pMnode == NULL || pAddr == NULL) {
×
UNCOV
2103
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
2104
  }
2105

UNCOV
2106
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
UNCOV
2107
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2108

UNCOV
2109
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
2110
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2111

2112
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
2113
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2114

2115
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
2116
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2117

2118
_error:
×
2119
  return code;
×
2120
}
2121

2122
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
2123
  int32_t code = 0;
×
2124

2125
  SMnode *pMnode = pReq->info.node;
×
UNCOV
2126
  void   *pIter = NULL;
×
2127
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2128
  mInfo("start to reload dnode tls config");
×
2129

2130
  SMCfgDnodeReq req = {0};
×
2131
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
2132
    goto _OVER;
×
2133
  }
2134

2135
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
2136
    goto _OVER;
×
2137
  }
2138

UNCOV
2139
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
2140
  if (pAddr == NULL) {
×
2141
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
2142
  }
2143

2144
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
2145

UNCOV
2146
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
2147
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
2148
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
2149
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
2150
    if (code != 0) {
×
2151
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
2152
    }
2153
  }
2154

UNCOV
2155
_OVER:
×
2156
  tFreeSMCfgDnodeReq(&req);
×
2157
  taosArrayDestroy(pAddr);
×
UNCOV
2158
  return code;
×
2159
}
2160

2161
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
2162
  int32_t code = 0;
×
2163
  if (pRsp->code != 0) {
×
2164
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
2165
  } else {
UNCOV
2166
    mInfo("succeed to reload dnode tls config");
×
2167
  }
2168
  return code;
×
2169
}
2170

2171
static int32_t mndProcessAlterEncryptKeyReqImpl(SRpcMsg *pReq, SMAlterEncryptKeyReq *pAlterReq) {
×
UNCOV
2172
  int32_t code = 0;
×
2173
  SMnode *pMnode = pReq->info.node;
×
2174
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2175
  void   *pIter = NULL;
×
2176

2177
  const STraceId *trace = &pReq->info.traceId;
×
2178

2179
  // Validate key type
2180
  if (pAlterReq->keyType != 0 && pAlterReq->keyType != 1) {
×
UNCOV
2181
    mGError("msg:%p, failed to alter encrypt key since invalid key type:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", pReq,
×
2182
            pAlterReq->keyType);
2183
    return TSDB_CODE_INVALID_PARA;
×
2184
  }
2185

2186
  // Validate new key length
2187
  int32_t klen = strlen(pAlterReq->newKey);
×
2188
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
2189
    mGError("msg:%p, failed to alter encrypt key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
2190
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
2191
    return TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
2192
  }
2193

2194
  // Prepare SMAlterEncryptKeyReq for distribution to dnodes
UNCOV
2195
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
2196
  alterKeyReq.keyType = pAlterReq->keyType;
×
2197
  tstrncpy(alterKeyReq.newKey, pAlterReq->newKey, sizeof(alterKeyReq.newKey));
×
UNCOV
2198
  alterKeyReq.sqlLen = 0;
×
UNCOV
2199
  alterKeyReq.sql = NULL;
×
2200

2201
  // Send request to all online dnodes
2202
  while (1) {
×
UNCOV
2203
    SDnodeObj *pDnode = NULL;
×
UNCOV
2204
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2205
    if (pIter == NULL) break;
×
2206

2207
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
2208
      mGWarn("msg:%p, don't send alter encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2209
             offlineReason[pDnode->offlineReason]);
2210
      sdbRelease(pSdb, pDnode);
×
2211
      continue;
×
2212
    }
2213

UNCOV
2214
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
2215
    int32_t bufLen = tSerializeSMAlterEncryptKeyReq(NULL, 0, &alterKeyReq);
×
2216
    void   *pBuf = rpcMallocCont(bufLen);
×
2217

2218
    if (pBuf != NULL) {
×
2219
      if ((bufLen = tSerializeSMAlterEncryptKeyReq(pBuf, bufLen, &alterKeyReq)) <= 0) {
×
UNCOV
2220
        code = bufLen;
×
UNCOV
2221
        sdbRelease(pSdb, pDnode);
×
2222
        goto _exit;
×
2223
      }
2224
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
2225
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
UNCOV
2226
      if (ret != 0) {
×
2227
        mGError("msg:%p, failed to send alter encrypt_key req to dnode:%d, error:%s", pReq, pDnode->id, tstrerror(ret));
×
2228
      } else {
2229
        mGInfo("msg:%p, send alter encrypt_key req to dnode:%d, keyType:%d", pReq, pDnode->id, pAlterReq->keyType);
×
2230
      }
2231
    }
2232

2233
    sdbRelease(pSdb, pDnode);
×
2234
  }
2235

2236
  // Note: mnode runs on dnode, so the local keys will be updated by dnode itself
2237
  // when it receives the alter encrypt key request from mnode
2238
  mGInfo("msg:%p, successfully sent alter encrypt key request to all dnodes, keyType:%d", pReq, pAlterReq->keyType);
×
2239

UNCOV
2240
_exit:
×
2241
  if (code != 0) {
×
2242
    if (terrno == 0) terrno = code;
×
2243
  }
2244
  return code;
×
2245
}
2246

UNCOV
2247
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq) {
×
2248
  SMnode              *pMnode = pReq->info.node;
×
2249
  SMAlterEncryptKeyReq alterReq = {0};
×
2250
  int32_t              code = TSDB_CODE_SUCCESS;
×
UNCOV
2251
  int32_t              lino = 0;
×
2252

2253
  // Check privilege - only admin can alter encryption keys
UNCOV
2254
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2255
                  &lino, _OVER);
2256

2257
  // Deserialize request
2258
  code = tDeserializeSMAlterEncryptKeyReq(pReq->pCont, pReq->contLen, &alterReq);
×
2259
  if (code != 0) {
×
2260
    mError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
UNCOV
2261
    goto _OVER;
×
2262
  }
2263

2264
  mInfo("received alter encrypt key req, keyType:%d", alterReq.keyType);
×
2265

2266
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2267
  // Process and distribute to all dnodes
2268
  code = mndProcessAlterEncryptKeyReqImpl(pReq, &alterReq);
×
2269
  if (code == 0) {
×
2270
    // Audit log
2271
    auditRecord(pReq, pMnode->clusterId, "alterEncryptKey", "", alterReq.keyType == 0 ? "SVR_KEY" : "DB_KEY",
×
2272
                alterReq.sql, alterReq.sqlLen, 0, 0);
2273
  }
2274
#else
2275
  // Community edition - no encryption support
2276
  mError("encryption key management is only available in enterprise edition");
2277
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2278
#endif
2279

2280
_OVER:
×
2281
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2282
    mError("failed to alter encrypt key, keyType:%d, since %s", alterReq.keyType, tstrerror(code));
×
2283
  }
2284

2285
  tFreeSMAlterEncryptKeyReq(&alterReq);
×
2286
  TAOS_RETURN(code);
×
2287
}
2288

UNCOV
2289
static int32_t mndProcessAlterKeyExpirationReqImpl(SRpcMsg *pReq, SMAlterKeyExpirationReq *pAlterReq) {
×
2290
  int32_t code = 0;
×
UNCOV
2291
  SMnode *pMnode = pReq->info.node;
×
UNCOV
2292
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2293
  void   *pIter = NULL;
×
2294

UNCOV
2295
  const STraceId *trace = &pReq->info.traceId;
×
2296

2297
  // Validate days value
UNCOV
2298
  if (pAlterReq->days < 0) {
×
2299
    mGError("msg:%p, failed to alter key expiration since invalid days:%d, must be >= 0", pReq, pAlterReq->days);
×
UNCOV
2300
    return TSDB_CODE_INVALID_PARA;
×
2301
  }
2302

2303
  // Validate strategy
UNCOV
2304
  if (strlen(pAlterReq->strategy) == 0) {
×
2305
    mGError("msg:%p, failed to alter key expiration since empty strategy", pReq);
×
UNCOV
2306
    return TSDB_CODE_INVALID_PARA;
×
2307
  }
2308

2309
  // Prepare SMAlterKeyExpirationReq for distribution to dnodes
2310
  SMAlterKeyExpirationReq alterReq = {0};
×
2311
  alterReq.days = pAlterReq->days;
×
2312
  tstrncpy(alterReq.strategy, pAlterReq->strategy, sizeof(alterReq.strategy));
×
UNCOV
2313
  alterReq.sqlLen = 0;
×
UNCOV
2314
  alterReq.sql = NULL;
×
2315

2316
  // Send request to all online dnodes
UNCOV
2317
  while (1) {
×
UNCOV
2318
    SDnodeObj *pDnode = NULL;
×
2319
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
2320
    if (pIter == NULL) break;
×
2321

2322
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
UNCOV
2323
      mGWarn("msg:%p, don't send alter key_expiration req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2324
             offlineReason[pDnode->offlineReason]);
2325
      sdbRelease(pSdb, pDnode);
×
UNCOV
2326
      continue;
×
2327
    }
2328

2329
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2330
    int32_t bufLen = tSerializeSMAlterKeyExpirationReq(NULL, 0, &alterReq);
×
UNCOV
2331
    void   *pBuf = rpcMallocCont(bufLen);
×
2332

UNCOV
2333
    if (pBuf != NULL) {
×
UNCOV
2334
      if ((bufLen = tSerializeSMAlterKeyExpirationReq(pBuf, bufLen, &alterReq)) <= 0) {
×
UNCOV
2335
        code = bufLen;
×
UNCOV
2336
        sdbRelease(pSdb, pDnode);
×
UNCOV
2337
        goto _exit;
×
2338
      }
UNCOV
2339
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_KEY_EXPIRATION, .pCont = pBuf, .contLen = bufLen};
×
UNCOV
2340
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2341
      if (ret != 0) {
×
2342
        mGError("msg:%p, failed to send alter key_expiration req to dnode:%d, error:%s", pReq, pDnode->id,
×
2343
                tstrerror(ret));
2344
      } else {
UNCOV
2345
        mGInfo("msg:%p, send alter key_expiration req to dnode:%d, days:%d, strategy:%s", pReq, pDnode->id,
×
2346
               pAlterReq->days, pAlterReq->strategy);
2347
      }
2348
    }
2349

2350
    sdbRelease(pSdb, pDnode);
×
2351
  }
2352

2353
  mGInfo("msg:%p, successfully sent alter key expiration request to all dnodes, days:%d, strategy:%s", pReq,
×
2354
         pAlterReq->days, pAlterReq->strategy);
2355

2356
_exit:
×
UNCOV
2357
  if (code != 0) {
×
UNCOV
2358
    if (terrno == 0) terrno = code;
×
2359
  }
2360
  return code;
×
2361
}
2362

UNCOV
2363
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq) {
×
UNCOV
2364
  SMnode                 *pMnode = pReq->info.node;
×
2365
  SMAlterKeyExpirationReq alterReq = {0};
×
2366
  int32_t                 code = TSDB_CODE_SUCCESS;
×
2367
  int32_t                 lino = 0;
×
2368

2369
  // Check privilege - only admin can alter key expiration
UNCOV
2370
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2371
                  &lino, _OVER);
2372

2373
  // Deserialize request
2374
  code = tDeserializeSMAlterKeyExpirationReq(pReq->pCont, pReq->contLen, &alterReq);
×
2375
  if (code != 0) {
×
UNCOV
2376
    mError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
UNCOV
2377
    goto _OVER;
×
2378
  }
2379

2380
  mInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
2381

2382
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2383
  // Process and distribute to all dnodes
2384
  code = mndProcessAlterKeyExpirationReqImpl(pReq, &alterReq);
×
UNCOV
2385
  if (code == 0) {
×
2386
    // Audit log
2387
    char detail[128];
×
UNCOV
2388
    snprintf(detail, sizeof(detail), "%d DAYS %s", alterReq.days, alterReq.strategy);
×
UNCOV
2389
    auditRecord(pReq, pMnode->clusterId, "alterKeyExpiration", "", detail, alterReq.sql, alterReq.sqlLen, 0, 0);
×
2390
  }
2391
#else
2392
  // Community edition - no encryption support
2393
  mError("key expiration management is only available in enterprise edition");
2394
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2395
#endif
2396

2397
_OVER:
×
2398
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2399
    mError("failed to alter key expiration, days:%d, strategy:%s, since %s", alterReq.days, alterReq.strategy,
×
2400
           tstrerror(code));
2401
  }
2402

2403
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
UNCOV
2404
  TAOS_RETURN(code);
×
2405
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc