• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4991

17 Mar 2026 07:57AM UTC coverage: 69.756% (+0.4%) from 69.348%
#4991

push

travis-ci

web-flow
merge: from main to 3.0 branch #34807

14 of 16 new or added lines in 5 files covered. (87.5%)

3928 existing lines in 138 files now uncovered.

192146 of 275455 relevant lines covered (69.76%)

137208686.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.75
/source/dnode/mnode/impl/src/mndDnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include <stdio.h>
19
#include "audit.h"
20
#include "mndBnode.h"
21
#include "mndCluster.h"
22
#include "mndDb.h"
23
#include "mndMnode.h"
24
#include "mndMount.h"
25
#include "mndPrivilege.h"
26
#include "mndQnode.h"
27
#include "mndShow.h"
28
#include "mndSnode.h"
29
#include "mndToken.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "taos_monitor.h"
34
#include "tconfig.h"
35
#include "tjson.h"
36
#include "tmisce.h"
37
#include "tunit.h"
38
#if defined(TD_ENTERPRISE)
39
#include "taoskInt.h"
40
#endif
41

42
#define TSDB_DNODE_VER_NUMBER   2
43
#define TSDB_DNODE_RESERVE_SIZE 40
44

45
static const char *offlineReason[] = {
46
    "",
47
    "status msg timeout",
48
    "status not received",
49
    "version not match",
50
    "dnodeId not match",
51
    "clusterId not match",
52
    "statusInterval not match",
53
    "timezone not match",
54
    "locale not match",
55
    "charset not match",
56
    "ttlChangeOnWrite not match",
57
    "enableWhiteList not match",
58
    "encryptionKey not match",
59
    "monitor not match",
60
    "monitor switch not match",
61
    "monitor interval not match",
62
    "monitor slow log threshold not match",
63
    "monitor slow log sql max len not match",
64
    "monitor slow log scope not match",
65
    "unknown",
66
};
67

68
enum {
69
  DND_ACTIVE_CODE,
70
  DND_CONN_ACTIVE_CODE,
71
};
72

73
enum {
74
  DND_CREATE,
75
  DND_ADD,
76
  DND_DROP,
77
};
78

79
static int32_t  mndCreateDefaultDnode(SMnode *pMnode);
80
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
81
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
82
static int32_t  mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
83
static int32_t  mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
84
static int32_t  mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
85
static int32_t  mndProcessDnodeListReq(SRpcMsg *pReq);
86

87
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
88
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
89
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
90
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
91
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
92
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
93
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
94
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq);
95
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
96
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
97
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
98

99
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
100
static void    mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
101
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
102
static void    mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
103

104
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq);
105
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq);
106
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq);
107
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp);
108
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq);
109
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq);
110

111
#ifdef _GRANT
112
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
113
#else
114
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
115
#endif
116

117
int32_t mndInitDnode(SMnode *pMnode) {
444,338✔
118
  SSdbTable table = {
444,338✔
119
      .sdbType = SDB_DNODE,
120
      .keyType = SDB_KEY_INT32,
121
      .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
122
      .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
123
      .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
124
      .insertFp = (SdbInsertFp)mndDnodeActionInsert,
125
      .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
126
      .deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
127
  };
128

129
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
444,338✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
444,338✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
444,338✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
444,338✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
444,338✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
444,338✔
135
  mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
444,338✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
444,338✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_BATCH_AUDIT, mndProcessBatchAuditReq);
444,338✔
138
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
444,338✔
139
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
444,338✔
140
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
444,338✔
141
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DNODE_RELOAD_TLS, mndProcessUpdateDnodeReloadTls);
444,338✔
142
  mndSetMsgHandle(pMnode, TDMT_DND_RELOAD_DNODE_TLS_RSP, mndProcessReloadDnodeTlsRsp);
444,338✔
143

144
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
444,338✔
145
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
444,338✔
146
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
444,338✔
147
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
444,338✔
148

149
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC, mndProcessKeySyncReq);
444,338✔
150
  mndSetMsgHandle(pMnode, TDMT_MND_KEY_SYNC_RSP, mndProcessKeySyncRsp);
444,338✔
151
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_ENCRYPT_KEY, mndProcessAlterEncryptKeyReq);
444,338✔
152
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_KEY_EXPIRATION, mndProcessAlterKeyExpirationReq);
444,338✔
153

154
  return sdbSetTable(pMnode->pSdb, table);
444,338✔
155
}
156

157
void mndCleanupDnode(SMnode *pMnode) {}
444,274✔
158

159
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
317,922✔
160
  int32_t  code = -1;
317,922✔
161
  SSdbRaw *pRaw = NULL;
317,922✔
162
  STrans  *pTrans = NULL;
317,922✔
163

164
  SDnodeObj dnodeObj = {0};
317,922✔
165
  dnodeObj.id = 1;
317,922✔
166
  dnodeObj.createdTime = taosGetTimestampMs();
317,922✔
167
  dnodeObj.updateTime = dnodeObj.createdTime;
317,922✔
168
  dnodeObj.port = tsServerPort;
317,922✔
169
  tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
317,922✔
170
  dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
317,922✔
171
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
317,922✔
172
  char *machineId = NULL;
317,922✔
173
  code = tGetMachineId(&machineId);
317,922✔
174
  if (machineId) {
317,922✔
175
    (void)memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
317,922✔
176
    taosMemoryFreeClear(machineId);
317,922✔
177
  } else {
178
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
179
    terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
180
    goto _OVER;
×
181
#endif
182
  }
183

184
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
317,922✔
185
  if (pTrans == NULL) {
317,922✔
186
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
187
    if (terrno != 0) code = terrno;
×
188
    goto _OVER;
×
189
  }
190
  mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
317,922✔
191

192
  pRaw = mndDnodeActionEncode(&dnodeObj);
317,922✔
193
  if (pRaw == NULL) {
317,922✔
194
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
195
    if (terrno != 0) code = terrno;
×
196
    goto _OVER;
×
197
  }
198
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
317,922✔
199
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
317,922✔
200
  pRaw = NULL;
317,922✔
201

202
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
317,922✔
203
  code = 0;
317,922✔
204

205
_OVER:
317,922✔
206
  mndTransDrop(pTrans);
317,922✔
207
  sdbFreeRaw(pRaw);
317,922✔
208
  return code;
317,922✔
209
}
210

211
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
2,615,390✔
212
  int32_t code = 0;
2,615,390✔
213
  int32_t lino = 0;
2,615,390✔
214
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,615,390✔
215

216
  SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
2,615,390✔
217
  if (pRaw == NULL) goto _OVER;
2,615,390✔
218

219
  int32_t dataPos = 0;
2,615,390✔
220
  SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
2,615,390✔
221
  SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
2,615,390✔
222
  SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
2,615,390✔
223
  SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
2,615,390✔
224
  SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
2,615,390✔
225
  SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
2,615,390✔
226
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
2,615,390✔
227
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,615,390✔
228
  SDB_SET_INT16(pRaw, dataPos, 0, _OVER)  // forward/backward compatible
2,615,390✔
229
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2,615,390✔
230

231
  terrno = 0;
2,615,390✔
232

233
_OVER:
2,615,390✔
234
  if (terrno != 0) {
2,615,390✔
235
    mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
×
236
    sdbFreeRaw(pRaw);
×
237
    return NULL;
×
238
  }
239

240
  mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
2,615,390✔
241
  return pRaw;
2,615,390✔
242
}
243

244
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
1,815,783✔
245
  int32_t code = 0;
1,815,783✔
246
  int32_t lino = 0;
1,815,783✔
247
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,815,783✔
248
  SSdbRow   *pRow = NULL;
1,815,783✔
249
  SDnodeObj *pDnode = NULL;
1,815,783✔
250

251
  int8_t sver = 0;
1,815,783✔
252
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,815,783✔
253
  if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
1,815,783✔
254
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
255
    goto _OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SDnodeObj));
1,815,783✔
259
  if (pRow == NULL) goto _OVER;
1,815,783✔
260

261
  pDnode = sdbGetRowObj(pRow);
1,815,783✔
262
  if (pDnode == NULL) goto _OVER;
1,815,783✔
263

264
  int32_t dataPos = 0;
1,815,783✔
265
  SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
1,815,783✔
266
  SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
1,815,783✔
267
  SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
1,815,783✔
268
  SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
1,815,783✔
269
  SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
1,815,783✔
270
  SDB_GET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER)
1,815,783✔
271
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
1,815,783✔
272
  if (sver > 1) {
1,815,783✔
273
    int16_t keyLen = 0;
1,815,783✔
274
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,815,783✔
275
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,815,783✔
276
    SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
1,815,783✔
277
    SDB_GET_BINARY(pRaw, dataPos, NULL, keyLen, _OVER)
1,815,783✔
278
  }
279

280
  terrno = 0;
1,815,783✔
281
  if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
1,815,783✔
282
    mInfo("dnode:%d, endpoint changed", pDnode->id);
×
283
  }
284

285
_OVER:
1,815,783✔
286
  if (terrno != 0) {
1,815,783✔
287
    mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
1,815,783✔
293
  return pRow;
1,815,783✔
294
}
295

296
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
781,120✔
297
  mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
781,120✔
298
  pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
781,120✔
299

300
  char ep[TSDB_EP_LEN] = {0};
781,120✔
301
  (void)snprintf(ep, TSDB_EP_LEN - 1, "%s:%u", pDnode->fqdn, pDnode->port);
781,120✔
302
  tstrncpy(pDnode->ep, ep, TSDB_EP_LEN);
781,120✔
303
  return 0;
781,120✔
304
}
305

306
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
1,815,735✔
307
  mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
1,815,735✔
308
  return 0;
1,815,735✔
309
}
310

311
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
1,025,452✔
312
  mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
1,025,452✔
313
  pOld->updateTime = pNew->updateTime;
1,025,452✔
314
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
315
  tstrncpy(pOld->machineId, pNew->machineId, TSDB_MACHINE_ID_LEN + 1);
1,025,452✔
316
#endif
317
  return 0;
1,025,452✔
318
}
319

320
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
163,614,011✔
321
  SSdb      *pSdb = pMnode->pSdb;
163,614,011✔
322
  SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
163,614,011✔
323
  if (pDnode == NULL) {
163,614,212✔
324
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
214,649✔
325
      terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
16,953✔
326
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
197,696✔
327
      terrno = TSDB_CODE_MND_DNODE_IN_CREATING;
×
328
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
197,696✔
329
      terrno = TSDB_CODE_MND_DNODE_IN_DROPPING;
197,696✔
330
    } else {
331
      terrno = TSDB_CODE_APP_ERROR;
×
332
      mFatal("dnode:%d, failed to acquire db since %s", dnodeId, terrstr());
×
333
    }
334
  }
335

336
  return pDnode;
163,614,212✔
337
}
338

339
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
164,746,390✔
340
  SSdb *pSdb = pMnode->pSdb;
164,746,390✔
341
  sdbRelease(pSdb, pDnode);
164,746,500✔
342
}
164,746,500✔
343

344
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
9,914,455✔
345
  SEpSet epSet = {0};
9,914,455✔
346
  terrno = addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
9,914,455✔
347
  return epSet;
9,914,455✔
348
}
349

350
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
961,965✔
351
  SEpSet     epSet = {0};
961,965✔
352
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
961,965✔
353
  if (!pDnode) return epSet;
961,965✔
354

355
  epSet = mndGetDnodeEpset(pDnode);
961,965✔
356

357
  mndReleaseDnode(pMnode, pDnode);
961,965✔
358
  return epSet;
961,965✔
359
}
360

361
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
1,317,869✔
362
  SSdb *pSdb = pMnode->pSdb;
1,317,869✔
363

364
  void *pIter = NULL;
1,317,869✔
365
  while (1) {
2,217,454✔
366
    SDnodeObj *pDnode = NULL;
3,535,323✔
367
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
3,535,323✔
368
    if (pIter == NULL) break;
3,535,323✔
369

370
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
2,834,817✔
371
      sdbCancelFetch(pSdb, pIter);
617,363✔
372
      return pDnode;
617,363✔
373
    }
374

375
    sdbRelease(pSdb, pDnode);
2,217,454✔
376
  }
377

378
  terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
700,506✔
379
  return NULL;
700,506✔
380
}
381

382
static SDnodeObj *mndAcquireDnodeAllStatusByEp(SMnode *pMnode, char *pEpStr) {
154,749✔
383
  SSdb *pSdb = pMnode->pSdb;
154,749✔
384

385
  void *pIter = NULL;
154,749✔
386
  while (1) {
164,891✔
387
    SDnodeObj *pDnode = NULL;
319,640✔
388
    ESdbStatus objStatus = 0;
319,640✔
389
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
319,640✔
390
    if (pIter == NULL) break;
319,640✔
391

392
    if (taosStrncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
319,640✔
393
      sdbCancelFetch(pSdb, pIter);
154,749✔
394
      return pDnode;
154,749✔
395
    }
396

397
    sdbRelease(pSdb, pDnode);
164,891✔
398
  }
399

400
  return NULL;
×
401
}
402

403
int32_t mndGetDnodeSize(SMnode *pMnode) {
73,653,894✔
404
  SSdb *pSdb = pMnode->pSdb;
73,653,894✔
405
  return sdbGetSize(pSdb, SDB_DNODE);
73,655,098✔
406
}
407

408
int32_t mndGetDbSize(SMnode *pMnode) {
×
409
  SSdb *pSdb = pMnode->pSdb;
×
410
  return sdbGetSize(pSdb, SDB_DB);
×
411
}
412

413
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
91,560,127✔
414
  int64_t interval = TABS(pDnode->lastAccessTime - curMs);
91,560,127✔
415
  if (interval > (int64_t)tsStatusTimeoutMs) {
91,560,849✔
416
    if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) {
2,194,378✔
417
      pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
39,440✔
418
    }
419
    return false;
2,195,330✔
420
  }
421
  return true;
89,366,471✔
422
}
423

424
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
2,227,993✔
425
  SSdb *pSdb = pMnode->pSdb;
2,227,993✔
426

427
  int32_t numOfEps = 0;
2,227,993✔
428
  void   *pIter = NULL;
2,227,993✔
429
  while (1) {
6,775,104✔
430
    SDnodeObj *pDnode = NULL;
9,003,097✔
431
    ESdbStatus objStatus = 0;
9,003,097✔
432
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
9,003,097✔
433
    if (pIter == NULL) break;
9,003,097✔
434

435
    SDnodeEp dnodeEp = {0};
6,775,104✔
436
    dnodeEp.id = pDnode->id;
6,775,104✔
437
    dnodeEp.ep.port = pDnode->port;
6,775,104✔
438
    tstrncpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
6,775,104✔
439
    sdbRelease(pSdb, pDnode);
6,775,104✔
440

441
    dnodeEp.isMnode = 0;
6,775,104✔
442
    if (mndIsMnode(pMnode, pDnode->id)) {
6,775,104✔
443
      dnodeEp.isMnode = 1;
2,713,337✔
444
    }
445
    if (taosArrayPush(pDnodeEps, &dnodeEp) == NULL) {
6,775,104✔
446
      mError("failed to put ep into array, but continue at this call");
×
447
    }
448
  }
449
}
2,227,993✔
450

451
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
24,363,180✔
452
  SSdb   *pSdb = pMnode->pSdb;
24,363,180✔
453
  int32_t code = 0;
24,363,180✔
454

455
  int32_t numOfEps = 0;
24,363,180✔
456
  void   *pIter = NULL;
24,363,180✔
457
  while (1) {
101,913,294✔
458
    SDnodeObj *pDnode = NULL;
126,276,474✔
459
    ESdbStatus objStatus = 0;
126,276,474✔
460
    pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
126,276,474✔
461
    if (pIter == NULL) break;
126,276,474✔
462

463
    SDnodeInfo dInfo;
101,911,100✔
464
    dInfo.id = pDnode->id;
101,913,294✔
465
    dInfo.ep.port = pDnode->port;
101,913,294✔
466
    dInfo.offlineReason = pDnode->offlineReason;
101,913,294✔
467
    tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
101,913,294✔
468
    sdbRelease(pSdb, pDnode);
101,913,294✔
469
    if (mndIsMnode(pMnode, pDnode->id)) {
101,913,294✔
470
      dInfo.isMnode = 1;
28,010,509✔
471
    } else {
472
      dInfo.isMnode = 0;
73,902,785✔
473
    }
474

475
    if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) {
101,913,294✔
476
      code = terrno;
×
477
      sdbCancelFetch(pSdb, pIter);
×
478
      break;
×
479
    }
480
  }
481
  TAOS_RETURN(code);
24,363,180✔
482
}
483

484
#define CHECK_MONITOR_PARA(para, err)                                                                    \
485
  if (pCfg->monitorParas.para != para) {                                                                 \
486
    mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
487
    terrno = err;                                                                                        \
488
    return err;                                                                                          \
489
  }
490

491
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
×
492
  CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
×
493
  CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
×
494
  CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
×
495
  CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
×
496
  CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
×
497

498
  if (0 != taosStrcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
×
499
    mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
×
500
           pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
501
    terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
×
502
    return DND_REASON_STATUS_MONITOR_NOT_MATCH;
×
503
  }
504

505
  /*
506
  if (pCfg->statusIntervalMs != tsStatusIntervalMs) {
507
    mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs,
508
           tsStatusIntervalMs);
509
    terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL;
510
    return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
511
  }
512
  */
513

514
  if ((0 != taosStrcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
×
515
    mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
×
516
           pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
517
    terrno = TSDB_CODE_DNODE_INVALID_TIMEZONE;
×
518
    return DND_REASON_TIME_ZONE_NOT_MATCH;
×
519
  }
520

521
  if (0 != taosStrcasecmp(pCfg->locale, tsLocale)) {
×
522
    mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
×
523
    terrno = TSDB_CODE_DNODE_INVALID_LOCALE;
×
524
    return DND_REASON_LOCALE_NOT_MATCH;
×
525
  }
526

527
  if (0 != taosStrcasecmp(pCfg->charset, tsCharset)) {
×
528
    mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
×
529
    terrno = TSDB_CODE_DNODE_INVALID_CHARSET;
×
530
    return DND_REASON_CHARSET_NOT_MATCH;
×
531
  }
532

533
  if (pCfg->ttlChangeOnWrite != tsTtlChangeOnWrite) {
×
534
    mError("dnode:%d, ttlChangeOnWrite:%d inconsistent with cluster:%d", pDnode->id, pCfg->ttlChangeOnWrite,
×
535
           tsTtlChangeOnWrite);
536
    terrno = TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR;
×
537
    return DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH;
×
538
  }
539
  int8_t enable = tsEnableWhiteList ? 1 : 0;
×
540
  if (pCfg->enableWhiteList != enable) {
×
541
    mError("dnode:%d, enableWhiteList:%d inconsistent with cluster:%d", pDnode->id, pCfg->enableWhiteList, enable);
×
542
    terrno = TSDB_CODE_DNODE_INVALID_EN_WHITELIST;
×
543
    return DND_REASON_ENABLE_WHITELIST_NOT_MATCH;
×
544
  }
545

546
  if (!atomic_load_8(&pMnode->encryptMgmt.encrypting) &&
×
547
      (pCfg->encryptionKeyStat != tsEncryptionKeyStat || pCfg->encryptionKeyChksum != tsEncryptionKeyChksum)) {
×
548
    mError("dnode:%d, encryptionKey:%" PRIi8 "-%u inconsistent with cluster:%" PRIi8 "-%u", pDnode->id,
×
549
           pCfg->encryptionKeyStat, pCfg->encryptionKeyChksum, tsEncryptionKeyStat, tsEncryptionKeyChksum);
550
    terrno = pCfg->encryptionKeyChksum ? TSDB_CODE_DNODE_INVALID_ENCRYPTKEY : TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
×
551
    return DND_REASON_ENCRYPTION_KEY_NOT_MATCH;
×
552
  }
553

554
  return DND_REASON_ONLINE;
×
555
}
556

557
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
42,719✔
558
  if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
42,719✔
559
    return 0.0;
569✔
560
  }
561

562
  int64_t deltaCount = currentCount - lastCount;
42,150✔
563
  int64_t deltaMs = currentTimeMs - lastTimeMs;
42,150✔
564
  double  rate = (double)deltaCount / (double)deltaMs;
42,150✔
565
  return rate;
42,150✔
566
}
567

568
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
113,059,818✔
569
  bool stateChanged = false;
113,059,818✔
570
  bool roleChanged = pGid->syncState != pVload->syncState ||
113,074,840✔
571
                     (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
221,240,139✔
572
                     pGid->roleTimeMs != pVload->roleTimeMs;
108,180,321✔
573

574
  if (pVload->syncCommitIndex > pVload->syncAppliedIndex) {
113,059,818✔
575
    if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
118,750✔
576
      pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
72,909✔
577
    } else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
45,841✔
578
      int64_t currentTimeMs = taosGetTimestampMs();
42,719✔
579
      pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
42,719✔
580
                                          pGid->lastSyncAppliedIndexUpdateTime);
581

582
      pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
42,719✔
583
    }
584
  }
585

586
  pGid->syncAppliedIndex = pVload->syncAppliedIndex;
113,059,818✔
587
  pGid->syncCommitIndex = pVload->syncCommitIndex;
113,059,818✔
588
  pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
113,059,818✔
589
  pGid->bufferSegmentSize = pVload->bufferSegmentSize;
113,059,818✔
590
  pGid->learnerProgress = pVload->learnerProgress;
113,059,818✔
591
  pGid->snapSeq = pVload->snapSeq;
113,059,818✔
592
  pGid->syncTotalIndex = pVload->syncTotalIndex;
113,059,818✔
593
  if (pVload->snapSeq > 0 && pVload->snapSeq < SYNC_SNAPSHOT_SEQ_END || pVload->syncState == TAOS_SYNC_STATE_LEARNER) {
113,059,818✔
594
    mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64
668,957✔
595
          " , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d",
596
          vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex,
597
          pVload->syncTotalIndex, pVload->learnerProgress, pVload->snapSeq);
598
  }
599

600
  if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
113,059,818✔
601
      pGid->startTimeMs != pVload->startTimeMs) {
107,705,340✔
602
    mInfo(
5,354,478✔
603
        "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
604
        "canRead:%d, dnode:%d",
605
        vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
606
        pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
607
    pGid->syncState = pVload->syncState;
5,354,478✔
608
    pGid->syncTerm = pVload->syncTerm;
5,354,478✔
609
    pGid->syncRestore = pVload->syncRestore;
5,354,478✔
610
    pGid->syncCanRead = pVload->syncCanRead;
5,354,478✔
611
    pGid->startTimeMs = pVload->startTimeMs;
5,354,478✔
612
    pGid->roleTimeMs = pVload->roleTimeMs;
5,354,478✔
613
    stateChanged = true;
5,354,478✔
614
  }
615
  return stateChanged;
113,059,818✔
616
}
617

618
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
24,548,467✔
619
  bool stateChanged = false;
24,548,467✔
620
  bool roleChanged = pObj->syncState != pMload->syncState ||
24,556,756✔
621
                     (pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
48,644,792✔
622
                     pObj->roleTimeMs != pMload->roleTimeMs;
24,096,325✔
623
  if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
24,548,467✔
624
    mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
458,101✔
625
          pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
626
          pObj->syncTerm, pMload->syncTerm);
627
    pObj->syncState = pMload->syncState;
458,101✔
628
    pObj->syncTerm = pMload->syncTerm;
458,101✔
629
    pObj->syncRestore = pMload->syncRestore;
458,101✔
630
    pObj->roleTimeMs = pMload->roleTimeMs;
458,101✔
631
    stateChanged = true;
458,101✔
632
  }
633
  return stateChanged;
24,548,467✔
634
}
635

636
extern char   *tsMonFwUri;
637
extern char   *tsMonSlowLogUri;
638
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
334✔
639
  SMnode    *pMnode = pReq->info.node;
334✔
640
  SStatisReq statisReq = {0};
334✔
641
  int32_t    code = -1;
334✔
642

643
  TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
334✔
644

645
  if (tsMonitorLogProtocol) {
334✔
646
    mInfo("process statis req,\n %s", statisReq.pCont);
60✔
647
  }
648

649
  if (statisReq.type == MONITOR_TYPE_COUNTER) {
334✔
650
    monSendContent(statisReq.pCont, tsMonFwUri);
334✔
651
  } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) {
×
652
    monSendContent(statisReq.pCont, tsMonSlowLogUri);
×
653
  }
654

655
  tFreeSStatisReq(&statisReq);
334✔
656
  return 0;
334✔
657
}
658

659
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
960✔
660
  mTrace("process audit req:%p", pReq);
960✔
661
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
960✔
662
    SMnode   *pMnode = pReq->info.node;
960✔
663
    SAuditReq auditReq = {0};
960✔
664

665
    TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
960✔
666

667
    mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
960✔
668

669
    auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
960✔
670
                   auditReq.sqlLen, auditReq.duration, auditReq.affectedRows);
671

672
    tFreeSAuditReq(&auditReq);
960✔
673
  }
674
  return 0;
960✔
675
}
676

677
static int32_t mndProcessBatchAuditReq(SRpcMsg *pReq) {
×
678
  mTrace("process audit req:%p", pReq);
×
679
  if (tsEnableAudit && tsAuditLevel >= AUDIT_LEVEL_DATA) {
×
680
    SMnode        *pMnode = pReq->info.node;
×
681
    SBatchAuditReq auditReq = {0};
×
682

683
    TAOS_CHECK_RETURN(tDeserializeSBatchAuditReq(pReq->pCont, pReq->contLen, &auditReq));
×
684

685
    int32_t nAudit = taosArrayGetSize(auditReq.auditArr);
×
686

687
    for (int32_t i = 0; i < nAudit; ++i) {
×
688
      SAuditReq *audit = TARRAY_GET_ELEM(auditReq.auditArr, i);
×
689
      mDebug("received audit req:%s, %s, %s, %s", audit->operation, audit->db, audit->table, audit->pSql);
×
690

691
      auditAddRecord(pReq, pMnode->clusterId, audit->operation, audit->db, audit->table, audit->pSql, audit->sqlLen,
×
692
                     audit->duration, audit->affectedRows);
693
    }
694

695
    tFreeSBatchAuditReq(&auditReq);
×
696
  }
697
  return 0;
×
698
}
699

700
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
864,659✔
701
  int32_t       code = 0, lino = 0;
864,659✔
702
  SDnodeInfoReq infoReq = {0};
864,659✔
703
  int32_t       contLen = 0;
864,659✔
704
  void         *pReq = NULL;
864,659✔
705

706
  infoReq.dnodeId = pDnode->id;
864,659✔
707
  tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
864,659✔
708

709
  if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
864,659✔
710
    TAOS_RETURN(contLen ? contLen : TSDB_CODE_OUT_OF_MEMORY);
×
711
  }
712
  pReq = rpcMallocCont(contLen);
864,659✔
713
  if (pReq == NULL) {
864,659✔
714
    TAOS_RETURN(terrno);
×
715
  }
716

717
  if ((contLen = tSerializeSDnodeInfoReq(pReq, contLen, &infoReq)) <= 0) {
864,659✔
718
    code = contLen;
×
719
    goto _exit;
×
720
  }
721

722
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
864,659✔
723
  TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
864,659✔
724
_exit:
864,659✔
725
  if (code < 0) {
864,659✔
726
    mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
×
727
  }
728
  TAOS_RETURN(code);
864,659✔
729
}
730

731
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
864,659✔
732
  int32_t       code = 0, lino = 0;
864,659✔
733
  SMnode       *pMnode = pReq->info.node;
864,659✔
734
  SDnodeInfoReq infoReq = {0};
864,659✔
735
  SDnodeObj    *pDnode = NULL;
864,659✔
736
  STrans       *pTrans = NULL;
864,659✔
737
  SSdbRaw      *pCommitRaw = NULL;
864,659✔
738

739
  TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
864,659✔
740

741
  pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
864,659✔
742
  if (pDnode == NULL) {
864,659✔
743
    TAOS_CHECK_EXIT(terrno);
×
744
  }
745

746
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
864,659✔
747
  if (pTrans == NULL) {
864,659✔
748
    TAOS_CHECK_EXIT(terrno);
×
749
  }
750

751
  pDnode->updateTime = taosGetTimestampMs();
864,659✔
752

753
  if ((pCommitRaw = mndDnodeActionEncode(pDnode)) == NULL) {
864,659✔
754
    TAOS_CHECK_EXIT(terrno);
×
755
  }
756
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
864,659✔
757
    mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
×
758
    TAOS_CHECK_EXIT(code);
×
759
  }
760
  TAOS_CHECK_EXIT(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
864,659✔
761
  pCommitRaw = NULL;
864,659✔
762

763
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
864,659✔
764
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
175✔
765
    TAOS_CHECK_EXIT(code);
175✔
766
  }
767

768
_exit:
864,659✔
769
  mndReleaseDnode(pMnode, pDnode);
864,659✔
770
  if (code != 0) {
864,659✔
771
    mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
175✔
772
  }
773
  mndTransDrop(pTrans);
864,659✔
774
  sdbFreeRaw(pCommitRaw);
864,659✔
775
  TAOS_RETURN(code);
864,659✔
776
}
777

778
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
40,869,338✔
779
  SMnode    *pMnode = pReq->info.node;
40,869,338✔
780
  SStatusReq statusReq = {0};
40,869,338✔
781
  SDnodeObj *pDnode = NULL;
40,869,338✔
782
  int32_t    code = -1;
40,869,338✔
783
  int32_t    lino = 0;
40,869,338✔
784

785
  TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), &lino, _OVER);
40,869,338✔
786

787
  int64_t clusterid = mndGetClusterId(pMnode);
40,869,338✔
788
  if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
40,869,338✔
789
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
790
    mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
×
791
          statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
792
    TAOS_CHECK_GOTO(code, &lino, _OVER);
×
793
  }
794

795
  if (statusReq.dnodeId == 0) {
40,869,338✔
796
    pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
998,252✔
797
    if (pDnode == NULL) {
998,252✔
798
      mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
380,975✔
799
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
380,975✔
800
      if (terrno != 0) code = terrno;
380,975✔
801
      TAOS_CHECK_GOTO(code, &lino, _OVER);
380,975✔
802
    }
803
  } else {
804
    pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
39,871,086✔
805
    if (pDnode == NULL) {
39,871,086✔
806
      int32_t err = terrno;
162,064✔
807
      pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
162,064✔
808
      if (pDnode != NULL) {
162,064✔
809
        pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
86✔
810
        code = err;
86✔
811
        TAOS_CHECK_GOTO(code, &lino, _OVER);
86✔
812
      }
813

814
      mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err);
161,978✔
815
      if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) {
161,978✔
816
        code = err;
7,229✔
817
        TAOS_CHECK_GOTO(code, &lino, _OVER);
7,229✔
818
      } else {
819
        pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp);
154,749✔
820
        if (pDnode == NULL) goto _OVER;
154,749✔
821
      }
822
    }
823
  }
824

825
  pMnode->ipWhiteVer = mndGetIpWhiteListVersion(pMnode);
40,481,048✔
826
  pMnode->timeWhiteVer = mndGetTimeWhiteListVersion(pMnode);
40,481,048✔
827

828
  int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
40,481,048✔
829
  int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
40,481,048✔
830
  int64_t curMs = taosGetTimestampMs();
40,481,048✔
831
  bool    online = mndIsDnodeOnline(pDnode, curMs);
40,481,048✔
832
  bool    dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
40,481,048✔
833
  bool    reboot = (pDnode->rebootTime != statusReq.rebootTime);
40,481,048✔
834
  bool    supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
40,481,048✔
835
  bool    encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
40,481,048✔
836
  bool    enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
40,481,048✔
837
  bool    analVerChanged = (analVer != statusReq.analVer);
40,481,048✔
838
  bool    auditDBChanged = false;
40,481,048✔
839
  char    auditDB[TSDB_DB_FNAME_LEN] = {0};
40,481,048✔
840
  bool    auditInfoChanged = false;
40,481,048✔
841
  char    auditToken[TSDB_TOKEN_LEN] = {0};
40,481,048✔
842

843
  SDbObj *pDb = NULL;
40,481,048✔
844
  if (tsAuditUseToken || tsAuditSaveInSelf) {
40,481,048✔
845
    pDb = mndAcquireAuditDb(pMnode);
40,481,048✔
846
  }
847
  if (tsAuditUseToken) {
40,481,048✔
848
    if (pDb != NULL) {
40,479,668✔
849
      SName name = {0};
1,020✔
850
      if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0)
1,020✔
851
        mError("db:%s, failed to parse db name", pDb->name);
×
852
      tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN);
1,020✔
853
    }
854
    if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true;
40,479,668✔
855

856
    char    auditUser[TSDB_USER_LEN] = {0};
40,479,668✔
857
    int32_t ret = 0;
40,479,668✔
858
    if ((ret = mndGetAuditUser(pMnode, auditUser)) != 0) {
40,479,668✔
859
      mTrace("dnode:%d, failed to get audit user since %s", pDnode->id, tstrerror(ret));
40,478,828✔
860
    } else {
861
      mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser);
840✔
862
      int32_t ret = 0;
840✔
863
      if ((ret = mndGetUserActiveToken("audit", auditToken)) != 0) {
840✔
864
        mTrace("dnode:%d, failed to get audit user active token, token:xxxx, since %s", pDnode->id, tstrerror(ret));
×
865
      } else {
866
        mTrace("dnode:%d, get audit user active token:xxxx", pDnode->id);
840✔
867
        if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditInfoChanged = true;
840✔
868
      }
869
    }
870
  }
871

872
  SEpSet  auditVnodeEpSet = {0};
40,481,048✔
873
  int32_t auditVgId = 0;
40,481,048✔
874
  if (tsAuditSaveInSelf) {
40,481,048✔
875
    if (pDb != NULL) {
1,380✔
876
      void   *pIter = NULL;
1,020✔
877
      SVgObj *pVgroup = NULL;
1,020✔
878
      while (1) {
879
        pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
3,060✔
880
        if (pIter == NULL) break;
3,060✔
881

882
        if (mndVgroupInDb(pVgroup, pDb->uid)) {
2,940✔
883
          auditVnodeEpSet = mndGetVgroupEpset(pMnode, pVgroup);
900✔
884
          auditVgId = pVgroup->vgId;
900✔
885
          sdbCancelFetch(pMnode->pSdb, pIter);
900✔
886
          sdbRelease(pMnode->pSdb, pVgroup);
900✔
887
          break;
900✔
888
        }
889
        sdbRelease(pMnode->pSdb, pVgroup);
2,040✔
890
      }
891
    }
892

893
    if (auditVnodeEpSet.numOfEps != statusReq.auditEpSet.numOfEps) {
1,380✔
894
      auditInfoChanged = true;
60✔
895
      mTrace("dnode:%d, audit epset num changed, auditNum:%d, inReq:%d", pDnode->id, auditVnodeEpSet.numOfEps,
60✔
896
             statusReq.auditEpSet.numOfEps);
897
    } else {
898
      for (int32_t i = 0; i < auditVnodeEpSet.numOfEps; i++) {
2,160✔
899
        if (strncmp(auditVnodeEpSet.eps[i].fqdn, statusReq.auditEpSet.eps[i].fqdn, TSDB_FQDN_LEN) != 0 ||
840✔
900
            auditVnodeEpSet.eps[i].port != statusReq.auditEpSet.eps[i].port) {
840✔
901
          // do not need to check InUse here, because inUse is not accurate at every time
902
          auditInfoChanged = true;
×
903
          mTrace("dnode:%d, audit epset changed at item:%d, fqdn:%s:%d:, inReq:%s:%d", pDnode->id, i,
×
904
                 auditVnodeEpSet.eps[i].fqdn, auditVnodeEpSet.eps[i].port, statusReq.auditEpSet.eps[i].fqdn,
905
                 statusReq.auditEpSet.eps[i].port);
906
          break;
×
907
        }
908
      }
909
    }
910

911
    if (auditVgId != statusReq.auditVgId) {
1,380✔
912
      auditInfoChanged = true;
60✔
913
      mTrace("dnode:%d, audit vgId changed, auditVgId:%d, inReq:%d", pDnode->id, auditVgId, statusReq.auditVgId);
60✔
914
    }
915
  }
916

917
  if (pDb != NULL) {
40,481,048✔
918
    mndReleaseDb(pMnode, pDb);
2,040✔
919
  }
920

921
  bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
39,820,008✔
922
                   pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer ||
38,253,465✔
923
                   encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditInfoChanged;
80,301,056✔
924
  const STraceId *trace = &pReq->info.traceId;
40,481,048✔
925
  char            timestamp[TD_TIME_STR_LEN] = {0};
40,481,048✔
926
  if (mDebugFlag & DEBUG_TRACE)
40,481,048✔
927
    (void)formatTimestampLocal(timestamp, sizeof(timestamp), statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
91,797✔
928
  mGTrace(
40,481,048✔
929
      "dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
930
      "timestamp:%s",
931
      pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
932

933
  if (reboot) {
40,481,048✔
934
    tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
685,603✔
935
  }
936

937
  int64_t delta = curMs / 1000 - statusReq.timestamp / 1000;
40,481,048✔
938
  if (labs(delta) >= tsTimestampDeltaLimit) {
40,481,048✔
939
    terrno = TSDB_CODE_TIME_UNSYNCED;
×
UNCOV
940
    code = terrno;
×
941

942
    pDnode->offlineReason = DND_REASON_TIME_UNSYNC;
×
UNCOV
943
    mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId,
×
944
           tstrerror(code), tsTimestampDeltaLimit);
UNCOV
945
    TAOS_CHECK_GOTO(code, &lino, _OVER);
×
946
  }
947
  for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
154,030,003✔
948
    SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
113,548,955✔
949

950
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
113,548,955✔
951
    if (pVgroup != NULL) {
113,548,955✔
952
      if (pVload->syncState == TAOS_SYNC_STATE_LEADER || pVload->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
113,110,606✔
953
        pVgroup->cacheUsage = pVload->cacheUsage;
86,594,886✔
954
        pVgroup->numOfCachedTables = pVload->numOfCachedTables;
86,594,886✔
955
        pVgroup->numOfTables = pVload->numOfTables;
86,594,886✔
956
        pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
86,594,886✔
957
        pVgroup->totalStorage = pVload->totalStorage;
86,594,886✔
958
        pVgroup->compStorage = pVload->compStorage;
86,594,886✔
959
        pVgroup->pointsWritten = pVload->pointsWritten;
86,594,886✔
960
      }
961
      bool stateChanged = false;
113,110,606✔
962
      for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
149,732,898✔
963
        SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
149,682,110✔
964
        if (pGid->dnodeId == statusReq.dnodeId) {
149,682,110✔
965
          if (pVload->startTimeMs == 0) {
113,059,818✔
UNCOV
966
            pVload->startTimeMs = statusReq.rebootTime;
×
967
          }
968
          if (pVload->roleTimeMs == 0) {
113,059,818✔
UNCOV
969
            pVload->roleTimeMs = statusReq.rebootTime;
×
970
          }
971
          stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
113,059,818✔
972
          break;
113,059,818✔
973
        }
974
      }
975
      if (stateChanged) {
113,110,606✔
976
        SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,354,478✔
977
        if (pDb != NULL && pDb->stateTs != curMs) {
5,354,478✔
978
          mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
3,711,391✔
979
                pDb->stateTs, curMs);
980
          pDb->stateTs = curMs;
3,711,391✔
981
        }
982
        mndReleaseDb(pMnode, pDb);
5,354,478✔
983
      }
984
    }
985

986
    mndReleaseVgroup(pMnode, pVgroup);
113,548,955✔
987
  }
988

989
  SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
40,481,048✔
990
  if (pObj != NULL) {
40,481,048✔
991
    if (statusReq.mload.roleTimeMs == 0) {
24,548,467✔
992
      statusReq.mload.roleTimeMs = statusReq.rebootTime;
323,897✔
993
    }
994
    (void)mndUpdateMnodeState(pObj, &statusReq.mload);
24,548,467✔
995
    mndReleaseMnode(pMnode, pObj);
24,548,467✔
996
  }
997

998
  SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
40,481,048✔
999
  if (pQnode != NULL) {
40,481,048✔
1000
    pQnode->load = statusReq.qload;
209,277✔
1001
    mndReleaseQnode(pMnode, pQnode);
209,277✔
1002
  }
1003

1004
  if (needCheck) {
40,481,048✔
1005
    if (statusReq.sver != tsVersion) {
2,227,993✔
1006
      if (pDnode != NULL) {
×
UNCOV
1007
        pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
×
1008
      }
1009
      mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
×
1010
      code = TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
UNCOV
1011
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1012
    }
1013

1014
    if (statusReq.dnodeId == 0) {
2,227,993✔
1015
      mInfo("dnode:%d, %s first access, clusterId:%" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
617,277✔
1016
    } else {
1017
      if (statusReq.clusterId != pMnode->clusterId) {
1,610,716✔
1018
        if (pDnode != NULL) {
×
UNCOV
1019
          pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
×
1020
        }
UNCOV
1021
        mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
×
1022
               pMnode->clusterId);
1023
        code = TSDB_CODE_MND_INVALID_CLUSTER_ID;
×
UNCOV
1024
        TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1025
      }
1026
    }
1027

1028
    // Verify whether the cluster parameters are consistent when status change from offline to ready
1029
    // pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
1030
    // if (pDnode->offlineReason != 0) {
1031
    //   mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
1032
    //   if (terrno == 0) terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
1033
    //   goto _OVER;
1034
    // }
1035

1036
    if (!online) {
2,227,993✔
1037
      mInfo("dnode:%d, from offline to online, memory avail:%" PRId64 " total:%" PRId64 " cores:%.2f", pDnode->id,
661,040✔
1038
            statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
1039
    } else {
1040
      mInfo("dnode:%d, do check in status req, online:%d dnodeVer:%" PRId64 ":%" PRId64
1,566,953✔
1041
            " reboot:%d, dnodeChanged:%d supportVnodesChanged:%d analVerChanged:%d encryptKeyChanged:%d "
1042
            "enableWhiteListChanged:%d auditDBChanged:%d auditInfoChanged:%d pMnode->ipWhiteVer:%" PRId64
1043
            " statusReq.ipWhiteVer:%" PRId64 " pMnode->timeWhiteVer:%" PRId64 " statusReq.timeWhiteVer:%" PRId64,
1044
            pDnode->id, online, statusReq.dnodeVer, dnodeVer, reboot, dnodeChanged, supportVnodesChanged,
1045
            analVerChanged, encryptKeyChanged, enableWhiteListChanged, auditDBChanged, auditInfoChanged,
1046
            pMnode->ipWhiteVer, statusReq.ipWhiteVer, pMnode->timeWhiteVer, statusReq.timeWhiteVer);
1047
    }
1048

1049
    pDnode->rebootTime = statusReq.rebootTime;
2,227,993✔
1050
    pDnode->numOfCores = statusReq.numOfCores;
2,227,993✔
1051
    pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
2,227,993✔
1052
    pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
2,227,993✔
1053
    pDnode->memAvail = statusReq.memAvail;
2,227,993✔
1054
    pDnode->memTotal = statusReq.memTotal;
2,227,993✔
1055
    pDnode->encryptionKeyStat = statusReq.clusterCfg.encryptionKeyStat;
2,227,993✔
1056
    pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum;
2,227,993✔
1057
    if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) {
2,227,993✔
1058
      tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
864,659✔
1059
      if ((code = mndUpdateDnodeObj(pMnode, pDnode)) != 0) {
864,659✔
UNCOV
1060
        TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1061
      }
1062
    }
1063

1064
    SStatusRsp statusRsp = {0};
2,227,993✔
1065
    statusRsp.statusSeq++;
2,227,993✔
1066
    statusRsp.analVer = analVer;
2,227,993✔
1067
    statusRsp.dnodeVer = dnodeVer;
2,227,993✔
1068
    statusRsp.dnodeCfg.dnodeId = pDnode->id;
2,227,993✔
1069
    statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
2,227,993✔
1070
    statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
2,227,993✔
1071
    if (statusRsp.pDnodeEps == NULL) {
2,227,993✔
1072
      code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1073
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1074
    }
1075

1076
    mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
2,227,993✔
1077
    statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
2,227,993✔
1078
    statusRsp.timeWhiteVer = pMnode->timeWhiteVer;
2,227,993✔
1079

1080
    if (auditInfoChanged || auditDBChanged) {
2,227,993✔
1081
      if (tsAuditUseToken) {
180✔
1082
        if (auditDB[0] != '\0') {
120✔
1083
          mInfo("dnode:%d, set audit db:%s in process status rsp", statusReq.dnodeId, auditDB);
120✔
1084
          tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN);
120✔
1085
        }
1086
        if (auditToken[0] != '\0') {
120✔
1087
          mInfo("dnode:%d, set audit token:xxxx in process status rsp", statusReq.dnodeId);
60✔
1088
          tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN);
60✔
1089
        }
1090
      }
1091

1092
      if (tsAuditSaveInSelf) {
180✔
1093
        mInfo("dnode:%d, set audit epset and vgId:%d in process status rsp", statusReq.dnodeId, auditVgId);
60✔
1094
        statusRsp.auditEpSet = auditVnodeEpSet;
60✔
1095
        statusRsp.auditVgId = auditVgId;
60✔
1096
      }
1097
    }
1098

1099
    int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
2,227,993✔
1100
    void   *pHead = rpcMallocCont(contLen);
2,227,993✔
1101
    contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp);
2,227,993✔
1102
    taosArrayDestroy(statusRsp.pDnodeEps);
2,227,993✔
1103
    if (contLen < 0) {
2,227,993✔
1104
      code = contLen;
×
UNCOV
1105
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
1106
    }
1107

1108
    pReq->info.rspLen = contLen;
2,227,993✔
1109
    pReq->info.rsp = pHead;
2,227,993✔
1110
  }
1111

1112
  pDnode->accessTimes++;
40,481,048✔
1113
  pDnode->lastAccessTime = curMs;
40,481,048✔
1114
  if ((DND_REASON_ONLINE != pDnode->offlineReason) && (online || mndIsDnodeOnline(pDnode, curMs))) {
40,481,048✔
1115
    pDnode->offlineReason = DND_REASON_ONLINE;
661,040✔
1116
  }
1117
  code = 0;
40,481,048✔
1118

1119
_OVER:
40,869,338✔
1120
  mndReleaseDnode(pMnode, pDnode);
40,869,338✔
1121
  taosArrayDestroy(statusReq.pVloads);
40,869,338✔
1122
  if (code != 0) {
40,869,338✔
1123
    mError("dnode:%d, failed to process status req at line:%d since %s", statusReq.dnodeId, lino, tstrerror(code));
388,290✔
1124
    return code;
388,290✔
1125
  }
1126

1127
  return mndUpdClusterInfo(pReq);
40,481,048✔
1128
}
1129

1130
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
×
1131
  SMnode    *pMnode = pReq->info.node;
×
1132
  SNotifyReq notifyReq = {0};
×
UNCOV
1133
  int32_t    code = 0;
×
1134

1135
  if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
×
1136
    terrno = code;
×
UNCOV
1137
    goto _OVER;
×
1138
  }
1139

1140
  int64_t clusterid = mndGetClusterId(pMnode);
×
1141
  if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
×
1142
    code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
×
UNCOV
1143
    mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
×
1144
          notifyReq.clusterId, clusterid, tstrerror(code));
UNCOV
1145
    goto _OVER;
×
1146
  }
1147

1148
  int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
×
1149
  for (int32_t v = 0; v < nVgroup; ++v) {
×
UNCOV
1150
    SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
×
1151

1152
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
×
1153
    if (pVgroup != NULL) {
×
1154
      pVgroup->numOfTimeSeries = pVload->nTimeSeries;
×
UNCOV
1155
      mndReleaseVgroup(pMnode, pVgroup);
×
1156
    }
1157
  }
1158
  code = mndUpdClusterInfo(pReq);
×
1159
_OVER:
×
1160
  tFreeSNotifyReq(&notifyReq);
×
UNCOV
1161
  return code;
×
1162
}
1163

1164
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
157,553✔
1165
  int32_t  code = -1;
157,553✔
1166
  SSdbRaw *pRaw = NULL;
157,553✔
1167
  STrans  *pTrans = NULL;
157,553✔
1168

1169
  SDnodeObj dnodeObj = {0};
157,553✔
1170
  dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
157,553✔
1171
  dnodeObj.createdTime = taosGetTimestampMs();
157,553✔
1172
  dnodeObj.updateTime = dnodeObj.createdTime;
157,553✔
1173
  dnodeObj.port = pCreate->port;
157,553✔
1174
  tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
157,553✔
1175
  (void)snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
157,553✔
1176

1177
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
157,553✔
1178
  if (pTrans == NULL) {
157,553✔
1179
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1180
    if (terrno != 0) code = terrno;
×
UNCOV
1181
    goto _OVER;
×
1182
  }
1183
  mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
157,553✔
1184
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
157,553✔
1185

1186
  pRaw = mndDnodeActionEncode(&dnodeObj);
157,553✔
1187
  if (pRaw == NULL) {
157,553✔
1188
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1189
    if (terrno != 0) code = terrno;
×
UNCOV
1190
    goto _OVER;
×
1191
  }
1192
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
157,553✔
1193
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_READY), NULL, _OVER);
157,553✔
1194
  pRaw = NULL;
157,553✔
1195

1196
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
157,553✔
1197
  code = 0;
157,553✔
1198

1199
_OVER:
157,553✔
1200
  mndTransDrop(pTrans);
157,553✔
1201
  sdbFreeRaw(pRaw);
157,553✔
1202
  return code;
157,553✔
1203
}
1204

1205
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
27,319✔
1206
  SMnode       *pMnode = pReq->info.node;
27,319✔
1207
  SSdb         *pSdb = pMnode->pSdb;
27,319✔
1208
  SDnodeObj    *pObj = NULL;
27,319✔
1209
  void         *pIter = NULL;
27,319✔
1210
  SDnodeListRsp rsp = {0};
27,319✔
1211
  int32_t       code = -1;
27,319✔
1212

1213
  rsp.dnodeList = taosArrayInit(5, sizeof(SDNodeAddr));
27,319✔
1214
  if (NULL == rsp.dnodeList) {
27,319✔
1215
    mError("failed to alloc epSet while process dnode list req");
×
1216
    code = terrno;
×
UNCOV
1217
    goto _OVER;
×
1218
  }
1219

1220
  while (1) {
50,379✔
1221
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
77,698✔
1222
    if (pIter == NULL) break;
77,698✔
1223

1224
    SDNodeAddr dnodeAddr = {0};
50,379✔
1225
    dnodeAddr.nodeId = pObj->id;
50,379✔
1226
    dnodeAddr.epSet.numOfEps = 1;
50,379✔
1227
    tstrncpy(dnodeAddr.epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
50,379✔
1228
    dnodeAddr.epSet.eps[0].port = pObj->port;
50,379✔
1229

1230
    if (taosArrayPush(rsp.dnodeList, &dnodeAddr) == NULL) {
100,758✔
1231
      if (terrno != 0) code = terrno;
×
1232
      sdbRelease(pSdb, pObj);
×
1233
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1234
      goto _OVER;
×
1235
    }
1236

1237
    sdbRelease(pSdb, pObj);
50,379✔
1238
  }
1239

1240
  int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
27,319✔
1241
  void   *pRsp = rpcMallocCont(rspLen);
27,319✔
1242
  if (pRsp == NULL) {
27,319✔
1243
    code = terrno;
×
UNCOV
1244
    goto _OVER;
×
1245
  }
1246

1247
  if ((rspLen = tSerializeSDnodeListRsp(pRsp, rspLen, &rsp)) <= 0) {
27,319✔
1248
    code = rspLen;
×
UNCOV
1249
    goto _OVER;
×
1250
  }
1251

1252
  pReq->info.rspLen = rspLen;
27,319✔
1253
  pReq->info.rsp = pRsp;
27,319✔
1254
  code = 0;
27,319✔
1255

1256
_OVER:
27,319✔
1257

1258
  if (code != 0) {
27,319✔
UNCOV
1259
    mError("failed to get dnode list since %s", tstrerror(code));
×
1260
  }
1261

1262
  tFreeSDnodeListRsp(&rsp);
27,319✔
1263

1264
  TAOS_RETURN(code);
27,319✔
1265
}
1266

1267
void getSlowLogScopeString(int32_t scope, char *result) {
1,176✔
1268
  if (scope == SLOW_LOG_TYPE_NULL) {
1,176✔
1269
    (void)strncat(result, "NONE", 64);
×
UNCOV
1270
    return;
×
1271
  }
1272
  while (scope > 0) {
2,352✔
1273
    if (scope & SLOW_LOG_TYPE_QUERY) {
1,176✔
1274
      (void)strncat(result, "QUERY", 64);
1,176✔
1275
      scope &= ~SLOW_LOG_TYPE_QUERY;
1,176✔
1276
    } else if (scope & SLOW_LOG_TYPE_INSERT) {
×
1277
      (void)strncat(result, "INSERT", 64);
×
1278
      scope &= ~SLOW_LOG_TYPE_INSERT;
×
1279
    } else if (scope & SLOW_LOG_TYPE_OTHERS) {
×
1280
      (void)strncat(result, "OTHERS", 64);
×
UNCOV
1281
      scope &= ~SLOW_LOG_TYPE_OTHERS;
×
1282
    } else {
1283
      (void)printf("invalid slow log scope:%d", scope);
×
UNCOV
1284
      return;
×
1285
    }
1286

1287
    if (scope > 0) {
1,176✔
UNCOV
1288
      (void)strncat(result, "|", 64);
×
1289
    }
1290
  }
1291
}
1292

1293
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
157,553✔
1294
  SMnode         *pMnode = pReq->info.node;
157,553✔
1295
  int32_t         code = -1;
157,553✔
1296
  SDnodeObj      *pDnode = NULL;
157,553✔
1297
  SCreateDnodeReq createReq = {0};
157,553✔
1298
  int32_t         lino = 0;
157,553✔
1299
  int64_t         tss = taosGetTimestampMs();
157,553✔
1300

1301
  if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
157,553✔
UNCOV
1302
    goto _OVER;
×
1303
  }
1304

1305
  code = tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq);
157,553✔
1306
  TAOS_CHECK_GOTO(code, &lino, _OVER);
157,553✔
1307

1308
  mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
157,553✔
1309
  code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_DNODE);
157,553✔
1310
  TAOS_CHECK_GOTO(code, &lino, _OVER);
157,553✔
1311

1312
  if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
157,553✔
1313
    code = TSDB_CODE_MND_INVALID_DNODE_EP;
×
UNCOV
1314
    goto _OVER;
×
1315
  }
1316
  // code = taosValidFqdn(tsEnableIpv6, createReq.fqdn);
1317
  // if (code != 0) {
1318
  //   mError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6, tsLocalFqdn,
1319
  //          tstrerror(code));
1320
  //   goto _OVER;
1321
  // }
1322

1323
  char ep[TSDB_EP_LEN];
157,553✔
1324
  (void)snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
157,553✔
1325
  pDnode = mndAcquireDnodeByEp(pMnode, ep);
157,553✔
1326
  if (pDnode != NULL) {
157,553✔
1327
    code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
×
UNCOV
1328
    goto _OVER;
×
1329
  }
1330

1331
  code = mndCreateDnode(pMnode, pReq, &createReq);
157,553✔
1332
  if (code == 0) {
157,553✔
1333
    code = TSDB_CODE_ACTION_IN_PROGRESS;
157,553✔
1334
    tsGrantHBInterval = 5;
157,553✔
1335
  }
1336

1337
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
157,553✔
1338
    char obj[200] = {0};
157,553✔
1339
    (void)snprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
157,553✔
1340

1341
    int64_t tse = taosGetTimestampMs();
157,553✔
1342
    double  duration = (double)(tse - tss);
157,553✔
1343
    duration = duration / 1000;
157,553✔
1344
    auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
157,553✔
1345
  }
1346

1347
_OVER:
157,553✔
1348
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
157,553✔
UNCOV
1349
    mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
×
1350
  }
1351

1352
  mndReleaseDnode(pMnode, pDnode);
157,553✔
1353
  tFreeSCreateDnodeReq(&createReq);
157,553✔
1354
  TAOS_RETURN(code);
157,553✔
1355
}
1356

1357
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
1358

1359
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq) { return mndProcessRestoreDnodeReqImpl(pReq); }
1,324✔
1360

1361
#ifndef TD_ENTERPRISE
1362
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq) { return 0; }
1363
#endif
1364

1365
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
9,704✔
1366
                            SSnodeObj *pSObj, SBnodeObj *pBObj, int32_t numOfVnodes, bool force, bool unsafe) {
1367
  int32_t  code = -1;
9,704✔
1368
  SSdbRaw *pRaw = NULL;
9,704✔
1369
  STrans  *pTrans = NULL;
9,704✔
1370
  int32_t  lino = 0;
9,704✔
1371

1372
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
9,704✔
1373
  if (pTrans == NULL) {
9,704✔
1374
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1375
    if (terrno != 0) code = terrno;
×
UNCOV
1376
    goto _OVER;
×
1377
  }
1378
  mndTransSetGroupParallel(pTrans);
9,704✔
1379
  mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
9,704✔
1380
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), &lino, _OVER);
9,704✔
1381

1382
  pRaw = mndDnodeActionEncode(pDnode);
9,704✔
1383
  if (pRaw == NULL) {
9,704✔
1384
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1385
    if (terrno != 0) code = terrno;
×
UNCOV
1386
    goto _OVER;
×
1387
  }
1388
  TAOS_CHECK_GOTO(mndTransAppendGroupRedolog(pTrans, pRaw, -1), &lino, _OVER);
9,704✔
1389
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING), &lino, _OVER);
9,704✔
1390
  pRaw = NULL;
9,704✔
1391

1392
  pRaw = mndDnodeActionEncode(pDnode);
9,704✔
1393
  if (pRaw == NULL) {
9,704✔
1394
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1395
    if (terrno != 0) code = terrno;
×
UNCOV
1396
    goto _OVER;
×
1397
  }
1398
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _OVER);
9,704✔
1399
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED), &lino, _OVER);
9,704✔
1400
  pRaw = NULL;
9,704✔
1401

1402
  if (pSObj != NULL) {
9,704✔
1403
    mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
589✔
1404
    TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pSObj, pTrans, force), &lino, _OVER);
589✔
1405
  }
1406

1407
  if (pMObj != NULL) {
9,704✔
1408
    mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
237✔
1409
    TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), &lino, _OVER);
237✔
1410
  }
1411

1412
  if (pQObj != NULL) {
9,704✔
1413
    mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
151✔
1414
    TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), &lino, _OVER);
151✔
1415
  }
1416

1417
  if (pBObj != NULL) {
9,704✔
1418
    mInfo("trans:%d, bnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
813✔
1419
    TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pBObj, force), &lino, _OVER);
813✔
1420
  }
1421

1422
  if (numOfVnodes > 0) {
8,891✔
1423
    mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
7,377✔
1424
    TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), &lino, _OVER);
7,377✔
1425
  }
1426

1427
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _OVER);
8,891✔
1428

1429
  code = 0;
8,891✔
1430

1431
_OVER:
9,704✔
1432
  if (code != 0) mError("dnode:%d, failed to drop dnode at line:%d since %s", pDnode->id, lino, tstrerror(code));
9,704✔
1433
  mndTransDrop(pTrans);
9,704✔
1434
  sdbFreeRaw(pRaw);
9,704✔
1435
  TAOS_RETURN(code);
9,704✔
1436
}
1437

1438
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
×
1439
  bool       isEmpty = false;
×
1440
  SMnodeObj *pMObj = NULL;
×
1441
  SQnodeObj *pQObj = NULL;
×
UNCOV
1442
  SSnodeObj *pSObj = NULL;
×
1443

1444
  pQObj = mndAcquireQnode(pMnode, dnodeId);
×
UNCOV
1445
  if (pQObj) goto _OVER;
×
1446

1447
  pSObj = mndAcquireSnode(pMnode, dnodeId);
×
UNCOV
1448
  if (pSObj) goto _OVER;
×
1449

1450
  pMObj = mndAcquireMnode(pMnode, dnodeId);
×
UNCOV
1451
  if (pMObj) goto _OVER;
×
1452

1453
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, dnodeId);
×
UNCOV
1454
  if (numOfVnodes > 0) goto _OVER;
×
1455

1456
  isEmpty = true;
×
1457
_OVER:
×
1458
  mndReleaseMnode(pMnode, pMObj);
×
1459
  mndReleaseQnode(pMnode, pQObj);
×
1460
  mndReleaseSnode(pMnode, pSObj);
×
UNCOV
1461
  return isEmpty;
×
1462
}
1463

1464
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
10,371✔
1465
  SMnode       *pMnode = pReq->info.node;
10,371✔
1466
  int32_t       code = -1;
10,371✔
1467
  SDnodeObj    *pDnode = NULL;
10,371✔
1468
  SMnodeObj    *pMObj = NULL;
10,371✔
1469
  SQnodeObj    *pQObj = NULL;
10,371✔
1470
  SSnodeObj    *pSObj = NULL;
10,371✔
1471
  SBnodeObj    *pBObj = NULL;
10,371✔
1472
  SDropDnodeReq dropReq = {0};
10,371✔
1473
  int64_t       tss = taosGetTimestampMs();
10,371✔
1474

1475
  TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
10,371✔
1476

1477
  mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
10,371✔
1478
        dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
1479
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MNODE), NULL, _OVER);
10,371✔
1480

1481
  bool force = dropReq.force;
10,371✔
1482
  if (dropReq.unsafe) {
10,371✔
UNCOV
1483
    force = true;
×
1484
  }
1485

1486
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
10,371✔
1487
  if (pDnode == NULL) {
10,371✔
1488
    int32_t err = terrno;
×
1489
    char    ep[TSDB_EP_LEN + 1] = {0};
×
1490
    (void)snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
×
1491
    pDnode = mndAcquireDnodeByEp(pMnode, ep);
×
1492
    if (pDnode == NULL) {
×
1493
      code = err;
×
UNCOV
1494
      goto _OVER;
×
1495
    }
1496
  }
1497

1498
  pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
10,371✔
1499
  pSObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
10,371✔
1500
  pBObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
10,371✔
1501
  pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
10,371✔
1502
  if (pMObj != NULL) {
10,371✔
1503
    if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
904✔
1504
      code = TSDB_CODE_MND_TOO_FEW_MNODES;
430✔
1505
      goto _OVER;
430✔
1506
    }
1507
    if (pMnode->selfDnodeId == dropReq.dnodeId) {
474✔
1508
      code = TSDB_CODE_MND_CANT_DROP_LEADER;
237✔
1509
      goto _OVER;
237✔
1510
    }
1511
  }
1512

1513
#ifdef USE_MOUNT
1514
  if (mndHasMountOnDnode(pMnode, dropReq.dnodeId) && !force) {
9,704✔
1515
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
1516
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
UNCOV
1517
    goto _OVER;
×
1518
  }
1519
#endif
1520

1521
  int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
9,704✔
1522
  bool    isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
9,704✔
1523

1524
  if (isonline && force) {
9,704✔
1525
    code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
×
UNCOV
1526
    mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d bnode:%d", pDnode->id,
×
1527
           tstrerror(code), numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL, pBObj != NULL);
UNCOV
1528
    goto _OVER;
×
1529
  }
1530

1531
  mError("vnode num:%d", numOfVnodes);
9,704✔
1532

1533
  bool    vnodeOffline = false;
9,704✔
1534
  void   *pIter = NULL;
9,704✔
1535
  int32_t vgId = -1;
9,704✔
1536
  while (1) {
21,976✔
1537
    SVgObj *pVgroup = NULL;
31,680✔
1538
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
31,680✔
1539
    if (pIter == NULL) break;
31,680✔
1540

1541
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
66,412✔
1542
      mError("vnode dnodeId:%d state:%d", pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].syncState);
44,436✔
1543
      if (pVgroup->vnodeGid[i].dnodeId == pDnode->id) {
44,436✔
1544
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_OFFLINE) {
14,742✔
1545
          vgId = pVgroup->vgId;
×
1546
          vnodeOffline = true;
×
UNCOV
1547
          break;
×
1548
        }
1549
      }
1550
    }
1551

1552
    sdbRelease(pMnode->pSdb, pVgroup);
21,976✔
1553

1554
    if (vnodeOffline) {
21,976✔
1555
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
1556
      break;
×
1557
    }
1558
  }
1559

1560
  if (vnodeOffline && !force) {
9,704✔
1561
    code = TSDB_CODE_VND_VNODE_OFFLINE;
×
UNCOV
1562
    mError("dnode:%d, failed to drop since vgId:%d is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, vgId,
×
1563
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
UNCOV
1564
    goto _OVER;
×
1565
  }
1566

1567
  if (!isonline && !force) {
9,704✔
1568
    code = TSDB_CODE_DNODE_OFFLINE;
×
UNCOV
1569
    mError("dnode:%d, failed to drop since dnode is offline, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id,
×
1570
           numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
UNCOV
1571
    goto _OVER;
×
1572
  }
1573

1574
  code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, pBObj, numOfVnodes, force, dropReq.unsafe);
9,704✔
1575
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
9,704✔
1576

1577
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
9,704✔
1578
    char obj1[30] = {0};
9,704✔
1579
    (void)snprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
9,704✔
1580

1581
    int64_t tse = taosGetTimestampMs();
9,704✔
1582
    double  duration = (double)(tse - tss);
9,704✔
1583
    duration = duration / 1000;
9,704✔
1584
    auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen, duration, 0);
9,704✔
1585
  }
1586

1587
_OVER:
10,371✔
1588
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10,371✔
1589
    mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
1,480✔
1590
  }
1591

1592
  mndReleaseDnode(pMnode, pDnode);
10,371✔
1593
  mndReleaseMnode(pMnode, pMObj);
10,371✔
1594
  mndReleaseQnode(pMnode, pQObj);
10,371✔
1595
  mndReleaseBnode(pMnode, pBObj);
10,371✔
1596
  mndReleaseSnode(pMnode, pSObj);
10,371✔
1597
  tFreeSDropDnodeReq(&dropReq);
10,371✔
1598
  TAOS_RETURN(code);
10,371✔
1599
}
1600

1601
static int32_t mndProcessCreateEncryptKeyReqImpl(SRpcMsg *pReq, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
172✔
1602
  int32_t code = 0;
172✔
1603
  SMnode *pMnode = pReq->info.node;
172✔
1604
  SSdb   *pSdb = pMnode->pSdb;
172✔
1605
  void   *pIter = NULL;
172✔
1606
  int8_t  encrypting = 0;
172✔
1607

1608
  const STraceId *trace = &pReq->info.traceId;
172✔
1609

1610
  int32_t klen = strlen(pDcfgReq->value);
172✔
1611
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
172✔
1612
    code = TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
UNCOV
1613
    mGError("msg:%p, failed to create encrypt_key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
1614
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
UNCOV
1615
    goto _exit;
×
1616
  }
1617

1618
  if (0 != (encrypting = atomic_val_compare_exchange_8(&pMnode->encryptMgmt.encrypting, 0, 1))) {
172✔
1619
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
1620
    mGWarn("msg:%p, failed to create encrypt key since %s, encrypting:%" PRIi8, pReq, tstrerror(code), encrypting);
×
UNCOV
1621
    goto _exit;
×
1622
  }
1623

1624
  if (tsEncryptionKeyStat == ENCRYPT_KEY_STAT_SET || tsEncryptionKeyStat == ENCRYPT_KEY_STAT_LOADED) {
172✔
1625
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1626
    code = TSDB_CODE_QRY_DUPLICATED_OPERATION;
×
UNCOV
1627
    mGWarn("msg:%p, failed to create encrypt key since %s, stat:%" PRIi8 ", checksum:%u", pReq, tstrerror(code),
×
1628
           tsEncryptionKeyStat, tsEncryptionKeyChksum);
UNCOV
1629
    goto _exit;
×
1630
  }
1631

1632
  atomic_store_16(&pMnode->encryptMgmt.nEncrypt, 0);
172✔
1633
  atomic_store_16(&pMnode->encryptMgmt.nSuccess, 0);
172✔
1634
  atomic_store_16(&pMnode->encryptMgmt.nFailed, 0);
172✔
1635

1636
  while (1) {
172✔
1637
    SDnodeObj *pDnode = NULL;
344✔
1638
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
344✔
1639
    if (pIter == NULL) break;
344✔
1640
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
172✔
UNCOV
1641
      mGWarn("msg:%p, don't send create encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
1642
             offlineReason[pDnode->offlineReason]);
1643
      sdbRelease(pSdb, pDnode);
×
UNCOV
1644
      continue;
×
1645
    }
1646

1647
    if (dnodeId == -1 || pDnode->id == dnodeId || dnodeId == 0) {
172✔
1648
      SEpSet  epSet = mndGetDnodeEpset(pDnode);
172✔
1649
      int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
172✔
1650
      void   *pBuf = rpcMallocCont(bufLen);
172✔
1651

1652
      if (pBuf != NULL) {
172✔
1653
        if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
172✔
1654
          code = bufLen;
×
1655
          sdbRelease(pSdb, pDnode);
×
UNCOV
1656
          goto _exit;
×
1657
        }
1658
        SRpcMsg rpcMsg = {.msgType = TDMT_DND_CREATE_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
172✔
1659
        if (0 == tmsgSendReq(&epSet, &rpcMsg)) {
172✔
1660
          (void)atomic_add_fetch_16(&pMnode->encryptMgmt.nEncrypt, 1);
172✔
1661
        }
1662
      }
1663
    }
1664

1665
    sdbRelease(pSdb, pDnode);
172✔
1666
  }
1667

1668
  if (atomic_load_16(&pMnode->encryptMgmt.nEncrypt) <= 0) {
172✔
UNCOV
1669
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
×
1670
  }
1671

1672
_exit:
172✔
1673
  if (code != 0) {
172✔
UNCOV
1674
    if (terrno == 0) terrno = code;
×
1675
  }
1676
  return code;
172✔
1677
}
1678

1679
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
172✔
1680
  int32_t code = 0;
172✔
1681

1682
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
1683
  SMnode       *pMnode = pReq->info.node;
172✔
1684
  SMCfgDnodeReq cfgReq = {0};
172✔
1685
  TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
172✔
1686

1687
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE)) != 0) {
172✔
1688
    tFreeSMCfgDnodeReq(&cfgReq);
×
UNCOV
1689
    TAOS_RETURN(code);
×
1690
  }
1691
  const STraceId *trace = &pReq->info.traceId;
172✔
1692
  SDCfgDnodeReq   dcfgReq = {0};
172✔
1693
  if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
172✔
1694
    tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
172✔
1695
    tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
172✔
1696
    tFreeSMCfgDnodeReq(&cfgReq);
172✔
1697
    return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
172✔
1698
  } else {
1699
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
1700
    tFreeSMCfgDnodeReq(&cfgReq);
×
UNCOV
1701
    TAOS_RETURN(code);
×
1702
  }
1703

1704
#else
1705
  TAOS_RETURN(code);
1706
#endif
1707
}
1708

1709
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
172✔
1710
  SMnode *pMnode = pRsp->info.node;
172✔
1711
  int16_t nSuccess = 0;
172✔
1712
  int16_t nFailed = 0;
172✔
1713

1714
  if (0 == pRsp->code) {
172✔
1715
    nSuccess = atomic_add_fetch_16(&pMnode->encryptMgmt.nSuccess, 1);
172✔
1716
  } else {
UNCOV
1717
    nFailed = atomic_add_fetch_16(&pMnode->encryptMgmt.nFailed, 1);
×
1718
  }
1719

1720
  int16_t nReq = atomic_load_16(&pMnode->encryptMgmt.nEncrypt);
172✔
1721
  bool    finished = nSuccess + nFailed >= nReq;
172✔
1722

1723
  if (finished) {
172✔
1724
    atomic_store_8(&pMnode->encryptMgmt.encrypting, 0);
172✔
1725
  }
1726

1727
  const STraceId *trace = &pRsp->info.traceId;
172✔
1728
  mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq,
172✔
1729
         nSuccess, nFailed, finished ? "encrypt done" : "in encrypting");
1730

1731
  return 0;
172✔
1732
}
1733

1734
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,176✔
1735
  SMnode *pMnode = pReq->info.node;
1,176✔
1736
  int32_t totalRows = 0;
1,176✔
1737
  int32_t numOfRows = 0;
1,176✔
1738
  char   *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
1,176✔
1739
  char    cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
1,176✔
1740
  char   *pWrite = NULL;
1,176✔
1741
  int32_t cols = 0;
1,176✔
1742
  int32_t code = 0;
1,176✔
1743
  int32_t lino = 0;
1,176✔
1744

1745
  cfgOpts[totalRows] = "statusIntervalMs";
1,176✔
1746
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs);
1,176✔
1747
  totalRows++;
1,176✔
1748

1749
  cfgOpts[totalRows] = "timezone";
1,176✔
1750
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
1,176✔
1751
  totalRows++;
1,176✔
1752

1753
  cfgOpts[totalRows] = "locale";
1,176✔
1754
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
1,176✔
1755
  totalRows++;
1,176✔
1756

1757
  cfgOpts[totalRows] = "charset";
1,176✔
1758
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
1,176✔
1759
  totalRows++;
1,176✔
1760

1761
  cfgOpts[totalRows] = "monitor";
1,176✔
1762
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
1,176✔
1763
  totalRows++;
1,176✔
1764

1765
  cfgOpts[totalRows] = "monitorInterval";
1,176✔
1766
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
1,176✔
1767
  totalRows++;
1,176✔
1768

1769
  cfgOpts[totalRows] = "slowLogThreshold";
1,176✔
1770
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
1,176✔
1771
  totalRows++;
1,176✔
1772

1773
  cfgOpts[totalRows] = "slowLogMaxLen";
1,176✔
1774
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
1,176✔
1775
  totalRows++;
1,176✔
1776

1777
  char scopeStr[64] = {0};
1,176✔
1778
  getSlowLogScopeString(tsSlowLogScope, scopeStr);
1,176✔
1779
  cfgOpts[totalRows] = "slowLogScope";
1,176✔
1780
  (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
1,176✔
1781
  totalRows++;
1,176✔
1782

1783
  char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
1,176✔
1784
  char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,176✔
1785

1786
  for (int32_t i = 0; i < totalRows; i++) {
11,760✔
1787
    cols = 0;
10,584✔
1788

1789
    STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
10,584✔
1790
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,584✔
1791
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), &lino, _OVER);
10,584✔
1792

1793
    STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
10,584✔
1794
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
10,584✔
1795
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)bufVal, false), &lino, _OVER);
10,584✔
1796

1797
    numOfRows++;
10,584✔
1798
  }
1799

1800
_OVER:
1,176✔
1801
  if (code != 0) mError("failed to retrieve configs at line:%d since %s", lino, tstrerror(code));
1,176✔
1802
  pShow->numOfRows += numOfRows;
1,176✔
1803
  return numOfRows;
1,176✔
1804
}
1805

UNCOV
1806
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
×
1807

1808
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
774,808✔
1809
  SMnode    *pMnode = pReq->info.node;
774,808✔
1810
  SSdb      *pSdb = pMnode->pSdb;
774,808✔
1811
  int32_t    numOfRows = 0;
774,808✔
1812
  int32_t    cols = 0;
774,808✔
1813
  ESdbStatus objStatus = 0;
774,808✔
1814
  SDnodeObj *pDnode = NULL;
774,808✔
1815
  int64_t    curMs = taosGetTimestampMs();
774,808✔
1816
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
774,602✔
1817
  int32_t    code = 0;
774,808✔
1818
  int32_t    lino = 0;
774,808✔
1819

1820
  while (numOfRows < rows) {
2,736,619✔
1821
    pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
2,736,619✔
1822
    if (pShow->pIter == NULL) break;
2,736,619✔
1823
    bool online = mndIsDnodeOnline(pDnode, curMs);
1,961,811✔
1824

1825
    cols = 0;
1,961,811✔
1826

1827
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,961,811✔
1828
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false), pDnode, &lino, _OVER);
1,961,811✔
1829

1830
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
1,961,811✔
1831

1832
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,961,811✔
1833
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,961,811✔
1834

1835
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,961,811✔
1836
    int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
1,961,811✔
1837
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&id, false), pDnode, &lino, _OVER);
1,961,811✔
1838

1839
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,961,811✔
1840
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false), pDnode,
1,961,811✔
1841
                        &lino, _OVER);
1842

1843
    const char *status = "ready";
1,961,811✔
1844
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
1,961,811✔
1845
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
1,961,811✔
1846
    if (!online) {
1,961,811✔
1847
      if (objStatus == SDB_STATUS_CREATING)
211,988✔
UNCOV
1848
        status = "creating*";
×
1849
      else if (objStatus == SDB_STATUS_DROPPING)
211,988✔
UNCOV
1850
        status = "dropping*";
×
1851
      else
1852
        status = "offline";
211,988✔
1853
    }
1854

1855
    STR_TO_VARSTR(buf, status);
1,961,811✔
1856
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,961,811✔
1857
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,961,811✔
1858

1859
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,961,811✔
1860
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false), pDnode, &lino,
1,961,811✔
1861
                        _OVER);
1862

1863
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,961,811✔
1864
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->rebootTime, false), pDnode, &lino,
1,961,811✔
1865
                        _OVER);
1866

1867
    char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
1,961,811✔
1868
    STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
1,961,811✔
1869

1870
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,961,811✔
1871
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, b, false), pDnode, &lino, _OVER);
1,961,811✔
1872
    taosMemoryFreeClear(b);
1,961,811✔
1873

1874
#ifdef TD_ENTERPRISE
1875
    STR_TO_VARSTR(buf, pDnode->machineId);
1,961,811✔
1876
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,961,811✔
1877
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, buf, false), pDnode, &lino, _OVER);
1,961,811✔
1878
#endif
1879

1880
    numOfRows++;
1,961,811✔
1881
    sdbRelease(pSdb, pDnode);
1,961,811✔
1882
  }
1883

1884
_OVER:
774,602✔
1885
  if (code != 0) mError("failed to retrieve dnodes at line:%d since %s", lino, tstrerror(code));
774,808✔
1886

1887
  pShow->numOfRows += numOfRows;
774,808✔
1888
  return numOfRows;
774,808✔
1889
}
1890

1891
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
×
1892
  SSdb *pSdb = pMnode->pSdb;
×
1893
  sdbCancelFetchByType(pSdb, pIter, SDB_DNODE);
×
UNCOV
1894
}
×
1895

1896
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
×
1897
  int32_t    code = 0;
×
1898
  SDnodeObj *pObj = NULL;
×
1899
  void      *pIter = NULL;
×
1900
  SSdb      *pSdb = pMnode->pSdb;
×
1901
  SArray    *fqdns = taosArrayInit(4, sizeof(void *));
×
1902
  if (fqdns == NULL) {
×
1903
    mError("failed to init fqdns array");
×
UNCOV
1904
    return NULL;
×
1905
  }
1906

1907
  while (1) {
×
1908
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
×
UNCOV
1909
    if (pIter == NULL) break;
×
1910

1911
    char *fqdn = taosStrdup(pObj->fqdn);
×
1912
    if (fqdn == NULL) {
×
1913
      sdbRelease(pSdb, pObj);
×
UNCOV
1914
      mError("failed to strdup fqdn:%s", pObj->fqdn);
×
1915

1916
      code = terrno;
×
UNCOV
1917
      break;
×
1918
    }
1919

1920
    if (taosArrayPush(fqdns, &fqdn) == NULL) {
×
UNCOV
1921
      mError("failed to fqdn into array, but continue at this time");
×
1922
    }
UNCOV
1923
    sdbRelease(pSdb, pObj);
×
1924
  }
1925

1926
_error:
×
1927
  if (code != 0) {
×
1928
    for (int32_t i = 0; i < taosArrayGetSize(fqdns); i++) {
×
1929
      char *pFqdn = (char *)taosArrayGetP(fqdns, i);
×
UNCOV
1930
      taosMemoryFreeClear(pFqdn);
×
1931
    }
1932
    taosArrayDestroy(fqdns);
×
UNCOV
1933
    fqdns = NULL;
×
1934
  }
1935

UNCOV
1936
  return fqdns;
×
1937
}
1938

1939
static int32_t mndProcessKeySyncReq(SRpcMsg *pReq) {
580,199✔
1940
  SMnode     *pMnode = pReq->info.node;
580,199✔
1941
  SKeySyncReq req = {0};
580,199✔
1942
  SKeySyncRsp rsp = {0};
580,199✔
1943
  int32_t     code = TSDB_CODE_SUCCESS;
580,199✔
1944

1945
  code = tDeserializeSKeySyncReq(pReq->pCont, pReq->contLen, &req);
580,199✔
1946
  if (code != 0) {
580,199✔
1947
    mError("failed to deserialize key sync req, since %s", tstrerror(code));
×
UNCOV
1948
    goto _OVER;
×
1949
  }
1950

1951
  mInfo("received key sync req from dnode:%d, keyVersion:%d", req.dnodeId, req.keyVersion);
580,199✔
1952

1953
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1954
  // Load mnode's encryption keys
1955
  char masterKeyFile[PATH_MAX] = {0};
580,199✔
1956
  snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
580,199✔
1957
           TD_DIRSEP);
1958
  char derivedKeyFile[PATH_MAX] = {0};
580,199✔
1959
  snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
580,199✔
1960
           TD_DIRSEP);
1961
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
580,199✔
1962
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
580,199✔
1963
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
580,199✔
1964
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
580,199✔
1965
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
580,199✔
1966
  int32_t algorithm = 0;
580,199✔
1967
  int32_t cfgAlgorithm = 0;
580,199✔
1968
  int32_t metaAlgorithm = 0;
580,199✔
1969
  int32_t fileVersion = 0;
580,199✔
1970
  int32_t keyVersion = 0;
580,199✔
1971
  int64_t createTime = 0;
580,199✔
1972
  int64_t svrKeyUpdateTime = 0;
580,199✔
1973
  int64_t dbKeyUpdateTime = 0;
580,199✔
1974

1975
  if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
580,199✔
1976
    keyVersion = tsEncryptKeyVersion;
1,518✔
1977
    tstrncpy(svrKey, tsSvrKey, ENCRYPT_KEY_LEN + 1);
1,518✔
1978
    tstrncpy(dbKey, tsDbKey, ENCRYPT_KEY_LEN + 1);
1,518✔
1979
    tstrncpy(cfgKey, tsCfgKey, ENCRYPT_KEY_LEN + 1);
1,518✔
1980
    tstrncpy(metaKey, tsMetaKey, ENCRYPT_KEY_LEN + 1);
1,518✔
1981
    tstrncpy(dataKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
1,518✔
1982
    algorithm = tsEncryptAlgorithmType;
1,518✔
1983
    cfgAlgorithm = tsCfgAlgorithm;
1,518✔
1984
    metaAlgorithm = tsMetaAlgorithm;
1,518✔
1985
    fileVersion = tsEncryptFileVersion;
1,518✔
1986
    createTime = tsEncryptKeyCreateTime;
1,518✔
1987
    svrKeyUpdateTime = tsSvrKeyUpdateTime;
1,518✔
1988
    dbKeyUpdateTime = tsDbKeyUpdateTime;
1,518✔
1989
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_LOADED;
1,518✔
1990
  } else {
1991
    rsp.encryptionKeyStatus = TSDB_ENCRYPT_KEY_STAT_DISABLED;
578,681✔
1992
  }
1993

1994
  // Check if dnode needs update
1995
  if (req.keyVersion != keyVersion) {
580,199✔
1996
    mInfo("dnode:%d key version mismatch, mnode:%d, dnode:%d, will send keys", req.dnodeId, keyVersion, req.keyVersion);
1,518✔
1997

1998
    rsp.keyVersion = keyVersion;
1,518✔
1999
    rsp.needUpdate = 1;
1,518✔
2000
    tstrncpy(rsp.svrKey, svrKey, sizeof(rsp.svrKey));
1,518✔
2001
    tstrncpy(rsp.dbKey, dbKey, sizeof(rsp.dbKey));
1,518✔
2002
    tstrncpy(rsp.cfgKey, cfgKey, sizeof(rsp.cfgKey));
1,518✔
2003
    tstrncpy(rsp.metaKey, metaKey, sizeof(rsp.metaKey));
1,518✔
2004
    tstrncpy(rsp.dataKey, dataKey, sizeof(rsp.dataKey));
1,518✔
2005
    rsp.algorithm = algorithm;
1,518✔
2006
    rsp.createTime = createTime;
1,518✔
2007
    rsp.svrKeyUpdateTime = svrKeyUpdateTime;
1,518✔
2008
    rsp.dbKeyUpdateTime = dbKeyUpdateTime;
1,518✔
2009
  } else {
2010
    mInfo("dnode:%d key version matches, version:%d", req.dnodeId, keyVersion);
578,681✔
2011
    rsp.keyVersion = keyVersion;
578,681✔
2012
    rsp.needUpdate = 0;
578,681✔
2013
  }
2014
#else
2015
  // Community edition - no encryption support
2016
  mWarn("enterprise features not enabled, key sync not supported");
2017
  rsp.keyVersion = 0;
2018
  rsp.needUpdate = 0;
2019
#endif
2020

2021
  int32_t contLen = tSerializeSKeySyncRsp(NULL, 0, &rsp);
580,199✔
2022
  if (contLen < 0) {
580,199✔
2023
    code = contLen;
×
UNCOV
2024
    goto _OVER;
×
2025
  }
2026

2027
  void *pHead = rpcMallocCont(contLen);
580,199✔
2028
  if (pHead == NULL) {
580,199✔
2029
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2030
    goto _OVER;
×
2031
  }
2032

2033
  contLen = tSerializeSKeySyncRsp(pHead, contLen, &rsp);
580,199✔
2034
  if (contLen < 0) {
580,199✔
2035
    rpcFreeCont(pHead);
×
2036
    code = contLen;
×
UNCOV
2037
    goto _OVER;
×
2038
  }
2039

2040
  pReq->info.rspLen = contLen;
580,199✔
2041
  pReq->info.rsp = pHead;
580,199✔
2042

2043
_OVER:
580,199✔
2044
  if (code != 0) {
580,199✔
UNCOV
2045
    mError("failed to process key sync req, since %s", tstrerror(code));
×
2046
  }
2047
  return code;
580,199✔
2048
}
2049

UNCOV
2050
static int32_t mndProcessKeySyncRsp(SRpcMsg *pReq) { return 0; }
×
2051

2052
static SDnodeObj *getDnodeObjByType(void *p, ESdbType type) {
×
UNCOV
2053
  if (p == NULL) return NULL;
×
2054

2055
  switch (type) {
×
2056
    case SDB_DNODE:
×
2057
      return (SDnodeObj *)p;
×
2058
    case SDB_QNODE:
×
2059
      return ((SQnodeObj *)p)->pDnode;
×
2060
    case SDB_SNODE:
×
2061
      return ((SSnodeObj *)p)->pDnode;
×
2062
    case SDB_BNODE:
×
2063
      return ((SBnodeObj *)p)->pDnode;
×
2064
    default:
×
UNCOV
2065
      break;
×
2066
  }
UNCOV
2067
  return NULL;
×
2068
}
2069
static int32_t mndGetAllNodeAddrByType(SMnode *pMnode, ESdbType type, SArray *pAddr) {
×
2070
  int32_t lino = 0;
×
2071
  SSdb   *pSdb = pMnode->pSdb;
×
2072
  void   *pIter = NULL;
×
UNCOV
2073
  int32_t code = 0;
×
2074

2075
  while (1) {
×
2076
    void *pObj = NULL;
×
2077
    pIter = sdbFetch(pSdb, type, pIter, (void **)&pObj);
×
UNCOV
2078
    if (pIter == NULL) break;
×
2079

2080
    SDnodeObj *pDnodeObj = getDnodeObjByType(pObj, type);
×
2081
    if (pDnodeObj == NULL) {
×
2082
      mError("null dnode object for type:%d", type);
×
2083
      sdbRelease(pSdb, pObj);
×
UNCOV
2084
      continue;
×
2085
    }
2086

2087
    SEpSet epSet = mndGetDnodeEpset(pDnodeObj);
×
2088
    if (taosArrayPush(pAddr, &epSet) == NULL) {
×
2089
      mError("failed to push addr into array");
×
2090
      sdbRelease(pSdb, pObj);
×
UNCOV
2091
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2092
    }
UNCOV
2093
    sdbRelease(pSdb, pObj);
×
2094
  }
2095

2096
_exit:
×
UNCOV
2097
  return code;
×
2098
}
2099

2100
static int32_t mndGetAllNodeAddr(SMnode *pMnode, SArray *pAddr) {
×
2101
  int32_t lino = 0;
×
2102
  int32_t code = 0;
×
2103
  if (pMnode == NULL || pAddr == NULL) {
×
UNCOV
2104
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _error);
×
2105
  }
2106

2107
  code = mndGetAllNodeAddrByType(pMnode, SDB_QNODE, pAddr);
×
UNCOV
2108
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2109

2110
  code = mndGetAllNodeAddrByType(pMnode, SDB_SNODE, pAddr);
×
UNCOV
2111
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2112

2113
  code = mndGetAllNodeAddrByType(pMnode, SDB_BNODE, pAddr);
×
UNCOV
2114
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2115

2116
  code = mndGetAllNodeAddrByType(pMnode, SDB_DNODE, pAddr);
×
UNCOV
2117
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2118

2119
_error:
×
UNCOV
2120
  return code;
×
2121
}
2122

2123
static int32_t mndProcessUpdateDnodeReloadTls(SRpcMsg *pReq) {
×
UNCOV
2124
  int32_t code = 0;
×
2125

2126
  SMnode *pMnode = pReq->info.node;
×
2127
  void   *pIter = NULL;
×
2128
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2129
  mInfo("start to reload dnode tls config");
×
2130

2131
  SMCfgDnodeReq req = {0};
×
2132
  if ((code = tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
UNCOV
2133
    goto _OVER;
×
2134
  }
2135

2136
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_ALTER_DNODE_RELOAD_TLS)) != 0) {
×
UNCOV
2137
    goto _OVER;
×
2138
  }
2139

2140
  SArray *pAddr = taosArrayInit(4, sizeof(SEpSet));
×
2141
  if (pAddr == NULL) {
×
UNCOV
2142
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
2143
  }
2144

UNCOV
2145
  code = mndGetAllNodeAddr(pMnode, pAddr);
×
2146

2147
  for (int32_t i = 0; i < taosArrayGetSize(pAddr); i++) {
×
2148
    SEpSet *pEpSet = (SEpSet *)taosArrayGet(pAddr, i);
×
2149
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_RELOAD_DNODE_TLS, .pCont = NULL, .contLen = 0};
×
2150
    code = tmsgSendReq(pEpSet, &rpcMsg);
×
2151
    if (code != 0) {
×
UNCOV
2152
      mError("failed to send reload tls req to dnode addr:%s since %s", pEpSet->eps[0].fqdn, tstrerror(code));
×
2153
    }
2154
  }
2155

2156
_OVER:
×
2157
  tFreeSMCfgDnodeReq(&req);
×
2158
  taosArrayDestroy(pAddr);
×
UNCOV
2159
  return code;
×
2160
}
2161

2162
static int32_t mndProcessReloadDnodeTlsRsp(SRpcMsg *pRsp) {
×
2163
  int32_t code = 0;
×
2164
  if (pRsp->code != 0) {
×
UNCOV
2165
    mError("failed to reload dnode tls config since %s", tstrerror(pRsp->code));
×
2166
  } else {
UNCOV
2167
    mInfo("succeed to reload dnode tls config");
×
2168
  }
UNCOV
2169
  return code;
×
2170
}
2171

2172
static int32_t mndProcessAlterEncryptKeyReqImpl(SRpcMsg *pReq, SMAlterEncryptKeyReq *pAlterReq) {
×
2173
  int32_t code = 0;
×
2174
  SMnode *pMnode = pReq->info.node;
×
2175
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2176
  void   *pIter = NULL;
×
2177

UNCOV
2178
  const STraceId *trace = &pReq->info.traceId;
×
2179

2180
  // Validate key type
2181
  if (pAlterReq->keyType != 0 && pAlterReq->keyType != 1) {
×
UNCOV
2182
    mGError("msg:%p, failed to alter encrypt key since invalid key type:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", pReq,
×
2183
            pAlterReq->keyType);
UNCOV
2184
    return TSDB_CODE_INVALID_PARA;
×
2185
  }
2186

2187
  // Validate new key length
2188
  int32_t klen = strlen(pAlterReq->newKey);
×
2189
  if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
×
UNCOV
2190
    mGError("msg:%p, failed to alter encrypt key since invalid key length:%d, valid range:[%d, %d]", pReq, klen,
×
2191
            ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
UNCOV
2192
    return TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN;
×
2193
  }
2194

2195
  // Prepare SMAlterEncryptKeyReq for distribution to dnodes
2196
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
2197
  alterKeyReq.keyType = pAlterReq->keyType;
×
2198
  tstrncpy(alterKeyReq.newKey, pAlterReq->newKey, sizeof(alterKeyReq.newKey));
×
2199
  alterKeyReq.sqlLen = 0;
×
UNCOV
2200
  alterKeyReq.sql = NULL;
×
2201

2202
  // Send request to all online dnodes
2203
  while (1) {
×
2204
    SDnodeObj *pDnode = NULL;
×
2205
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
UNCOV
2206
    if (pIter == NULL) break;
×
2207

2208
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
UNCOV
2209
      mGWarn("msg:%p, don't send alter encrypt_key req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2210
             offlineReason[pDnode->offlineReason]);
2211
      sdbRelease(pSdb, pDnode);
×
UNCOV
2212
      continue;
×
2213
    }
2214

2215
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2216
    int32_t bufLen = tSerializeSMAlterEncryptKeyReq(NULL, 0, &alterKeyReq);
×
UNCOV
2217
    void   *pBuf = rpcMallocCont(bufLen);
×
2218

2219
    if (pBuf != NULL) {
×
2220
      if ((bufLen = tSerializeSMAlterEncryptKeyReq(pBuf, bufLen, &alterKeyReq)) <= 0) {
×
2221
        code = bufLen;
×
2222
        sdbRelease(pSdb, pDnode);
×
UNCOV
2223
        goto _exit;
×
2224
      }
2225
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_ENCRYPT_KEY, .pCont = pBuf, .contLen = bufLen};
×
2226
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2227
      if (ret != 0) {
×
UNCOV
2228
        mGError("msg:%p, failed to send alter encrypt_key req to dnode:%d, error:%s", pReq, pDnode->id, tstrerror(ret));
×
2229
      } else {
UNCOV
2230
        mGInfo("msg:%p, send alter encrypt_key req to dnode:%d, keyType:%d", pReq, pDnode->id, pAlterReq->keyType);
×
2231
      }
2232
    }
2233

UNCOV
2234
    sdbRelease(pSdb, pDnode);
×
2235
  }
2236

2237
  // Note: mnode runs on dnode, so the local keys will be updated by dnode itself
2238
  // when it receives the alter encrypt key request from mnode
UNCOV
2239
  mGInfo("msg:%p, successfully sent alter encrypt key request to all dnodes, keyType:%d", pReq, pAlterReq->keyType);
×
2240

2241
_exit:
×
2242
  if (code != 0) {
×
UNCOV
2243
    if (terrno == 0) terrno = code;
×
2244
  }
UNCOV
2245
  return code;
×
2246
}
2247

2248
static int32_t mndProcessAlterEncryptKeyReq(SRpcMsg *pReq) {
×
2249
  SMnode              *pMnode = pReq->info.node;
×
2250
  SMAlterEncryptKeyReq alterReq = {0};
×
2251
  int32_t              code = TSDB_CODE_SUCCESS;
×
UNCOV
2252
  int32_t              lino = 0;
×
2253

2254
  // Check privilege - only admin can alter encryption keys
UNCOV
2255
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2256
                  &lino, _OVER);
2257

2258
  // Deserialize request
2259
  code = tDeserializeSMAlterEncryptKeyReq(pReq->pCont, pReq->contLen, &alterReq);
×
2260
  if (code != 0) {
×
2261
    mError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
UNCOV
2262
    goto _OVER;
×
2263
  }
2264

UNCOV
2265
  mInfo("received alter encrypt key req, keyType:%d", alterReq.keyType);
×
2266

2267
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2268
  // Process and distribute to all dnodes
2269
  code = mndProcessAlterEncryptKeyReqImpl(pReq, &alterReq);
×
UNCOV
2270
  if (code == 0) {
×
2271
    // Audit log
UNCOV
2272
    auditRecord(pReq, pMnode->clusterId, "alterEncryptKey", "", alterReq.keyType == 0 ? "SVR_KEY" : "DB_KEY",
×
2273
                alterReq.sql, alterReq.sqlLen, 0, 0);
2274
  }
2275
#else
2276
  // Community edition - no encryption support
2277
  mError("encryption key management is only available in enterprise edition");
2278
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2279
#endif
2280

2281
_OVER:
×
2282
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2283
    mError("failed to alter encrypt key, keyType:%d, since %s", alterReq.keyType, tstrerror(code));
×
2284
  }
2285

2286
  tFreeSMAlterEncryptKeyReq(&alterReq);
×
UNCOV
2287
  TAOS_RETURN(code);
×
2288
}
2289

2290
static int32_t mndProcessAlterKeyExpirationReqImpl(SRpcMsg *pReq, SMAlterKeyExpirationReq *pAlterReq) {
×
2291
  int32_t code = 0;
×
2292
  SMnode *pMnode = pReq->info.node;
×
2293
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2294
  void   *pIter = NULL;
×
2295

UNCOV
2296
  const STraceId *trace = &pReq->info.traceId;
×
2297

2298
  // Validate days value
2299
  if (pAlterReq->days < 0) {
×
2300
    mGError("msg:%p, failed to alter key expiration since invalid days:%d, must be >= 0", pReq, pAlterReq->days);
×
UNCOV
2301
    return TSDB_CODE_INVALID_PARA;
×
2302
  }
2303

2304
  // Validate strategy
2305
  if (strlen(pAlterReq->strategy) == 0) {
×
2306
    mGError("msg:%p, failed to alter key expiration since empty strategy", pReq);
×
UNCOV
2307
    return TSDB_CODE_INVALID_PARA;
×
2308
  }
2309

2310
  // Prepare SMAlterKeyExpirationReq for distribution to dnodes
2311
  SMAlterKeyExpirationReq alterReq = {0};
×
2312
  alterReq.days = pAlterReq->days;
×
2313
  tstrncpy(alterReq.strategy, pAlterReq->strategy, sizeof(alterReq.strategy));
×
2314
  alterReq.sqlLen = 0;
×
UNCOV
2315
  alterReq.sql = NULL;
×
2316

2317
  // Send request to all online dnodes
2318
  while (1) {
×
2319
    SDnodeObj *pDnode = NULL;
×
2320
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
UNCOV
2321
    if (pIter == NULL) break;
×
2322

2323
    if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
UNCOV
2324
      mGWarn("msg:%p, don't send alter key_expiration req since dnode:%d in offline state:%s", pReq, pDnode->id,
×
2325
             offlineReason[pDnode->offlineReason]);
2326
      sdbRelease(pSdb, pDnode);
×
UNCOV
2327
      continue;
×
2328
    }
2329

2330
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
×
2331
    int32_t bufLen = tSerializeSMAlterKeyExpirationReq(NULL, 0, &alterReq);
×
UNCOV
2332
    void   *pBuf = rpcMallocCont(bufLen);
×
2333

2334
    if (pBuf != NULL) {
×
2335
      if ((bufLen = tSerializeSMAlterKeyExpirationReq(pBuf, bufLen, &alterReq)) <= 0) {
×
2336
        code = bufLen;
×
2337
        sdbRelease(pSdb, pDnode);
×
UNCOV
2338
        goto _exit;
×
2339
      }
2340
      SRpcMsg rpcMsg = {.msgType = TDMT_MND_ALTER_KEY_EXPIRATION, .pCont = pBuf, .contLen = bufLen};
×
2341
      int32_t ret = tmsgSendReq(&epSet, &rpcMsg);
×
2342
      if (ret != 0) {
×
UNCOV
2343
        mGError("msg:%p, failed to send alter key_expiration req to dnode:%d, error:%s", pReq, pDnode->id,
×
2344
                tstrerror(ret));
2345
      } else {
UNCOV
2346
        mGInfo("msg:%p, send alter key_expiration req to dnode:%d, days:%d, strategy:%s", pReq, pDnode->id,
×
2347
               pAlterReq->days, pAlterReq->strategy);
2348
      }
2349
    }
2350

UNCOV
2351
    sdbRelease(pSdb, pDnode);
×
2352
  }
2353

UNCOV
2354
  mGInfo("msg:%p, successfully sent alter key expiration request to all dnodes, days:%d, strategy:%s", pReq,
×
2355
         pAlterReq->days, pAlterReq->strategy);
2356

2357
_exit:
×
2358
  if (code != 0) {
×
UNCOV
2359
    if (terrno == 0) terrno = code;
×
2360
  }
UNCOV
2361
  return code;
×
2362
}
2363

2364
static int32_t mndProcessAlterKeyExpirationReq(SRpcMsg *pReq) {
×
2365
  SMnode                 *pMnode = pReq->info.node;
×
2366
  SMAlterKeyExpirationReq alterReq = {0};
×
2367
  int32_t                 code = TSDB_CODE_SUCCESS;
×
UNCOV
2368
  int32_t                 lino = 0;
×
2369

2370
  // Check privilege - only admin can alter key expiration
UNCOV
2371
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, RPC_MSG_TOKEN(pReq), MND_OPER_CONFIG_DNODE),
×
2372
                  &lino, _OVER);
2373

2374
  // Deserialize request
2375
  code = tDeserializeSMAlterKeyExpirationReq(pReq->pCont, pReq->contLen, &alterReq);
×
2376
  if (code != 0) {
×
2377
    mError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
UNCOV
2378
    goto _OVER;
×
2379
  }
2380

UNCOV
2381
  mInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
2382

2383
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
2384
  // Process and distribute to all dnodes
2385
  code = mndProcessAlterKeyExpirationReqImpl(pReq, &alterReq);
×
UNCOV
2386
  if (code == 0) {
×
2387
    // Audit log
2388
    char detail[128];
×
2389
    snprintf(detail, sizeof(detail), "%d DAYS %s", alterReq.days, alterReq.strategy);
×
UNCOV
2390
    auditRecord(pReq, pMnode->clusterId, "alterKeyExpiration", "", detail, alterReq.sql, alterReq.sqlLen, 0, 0);
×
2391
  }
2392
#else
2393
  // Community edition - no encryption support
2394
  mError("key expiration management is only available in enterprise edition");
2395
  code = TSDB_CODE_OPS_NOT_SUPPORT;
2396
#endif
2397

2398
_OVER:
×
2399
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2400
    mError("failed to alter key expiration, days:%d, strategy:%s, since %s", alterReq.days, alterReq.strategy,
×
2401
           tstrerror(code));
2402
  }
2403

2404
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
UNCOV
2405
  TAOS_RETURN(code);
×
2406
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc