• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5050

12 May 2026 05:36AM UTC coverage: 73.398% (+0.09%) from 73.313%
#5050

push

travis-ci

web-flow
merge: from main to 3.0 branch #35319

90 of 101 new or added lines in 2 files covered. (89.11%)

489 existing lines in 125 files now uncovered.

281602 of 383662 relevant lines covered (73.4%)

138099127.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.74
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "crypt.h"
21
#include "monitor.h"
22
#include "stream.h"
23
#include "systable.h"
24
#include "tanalytics.h"
25
#include "tchecksum.h"
26
#include "tencrypt.h"
27
#include "tutil.h"
28

29
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
30
#include "taoskInt.h"
31
#endif
32

33
extern SConfig *tsCfg;
34
extern void     setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId);
35

36
#ifndef TD_ENTERPRISE
37
void setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId) {}
38
#endif
39

40
extern void getAuditDbNameToken(char *pDb, char *pToken);
41

42
#ifndef TD_ENTERPRISE
43
void getAuditDbNameToken(char *pDb, char *pToken) {}
44
#endif
45

46
extern void getAuditEpSet(SEpSet *ep, int32_t *pVgId);
47

48
#ifndef TD_ENTERPRISE
49
void getAuditEpSet(SEpSet *ep, int32_t *pVgId) {}
50
#endif
51

52
SMonVloadInfo tsVinfo = {0};
53
SMnodeLoad    tsMLoad = {0};
54
SDnodeData    tsDnodeData = {0};
55

56
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
2,439,813✔
57
  int32_t code = 0;
2,439,813✔
58
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
2,439,813✔
59
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
555,228✔
60
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
555,228✔
61
    pMgmt->pData->dnodeId = pCfg->dnodeId;
555,228✔
62
    pMgmt->pData->clusterId = pCfg->clusterId;
555,228✔
63
    monSetDnodeId(pCfg->dnodeId);
555,228✔
64
    auditSetDnodeId(pCfg->dnodeId);
555,228✔
65
    code = dmWriteEps(pMgmt->pData);
555,228✔
66
    if (code != 0) {
555,228✔
UNCOV
67
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
×
68
            tstrerror(code));
69
    }
70
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
555,228✔
71
  }
72
}
2,439,813✔
73

74
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,593,073✔
75
  int32_t code = 0;
2,593,073✔
76
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,593,073✔
77
  if (pMgmt->pData->ipWhiteVer == ver) {
2,593,073✔
78
    if (ver == 0) {
2,592,503✔
79
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,591,507✔
80
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,591,507✔
81
        dError("failed to disable ip white list on dnode");
×
82
      }
83
    }
84
    return;
2,592,503✔
85
  }
86
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
570✔
87

88
  SRetrieveWhiteListReq req = {.ver = oldVer};
570✔
89
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
570✔
90
  if (contLen < 0) {
570✔
91
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
92
    return;
×
93
  }
94
  void *pHead = rpcMallocCont(contLen);
570✔
95
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
570✔
96
  if (contLen < 0) {
570✔
97
    rpcFreeCont(pHead);
×
98
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
99
    return;
×
100
  }
101

102
  SRpcMsg rpcMsg = {.pCont = pHead,
570✔
103
                    .contLen = contLen,
104
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
105
                    .info.ahandle = 0,
106
                    .info.notFreeAhandle = 1,
107
                    .info.refId = 0,
108
                    .info.noResp = 0,
109
                    .info.handle = 0};
110
  SEpSet  epset = {0};
570✔
111

112
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
570✔
113

114
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
570✔
115
  if (code != 0) {
570✔
116
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
117
  }
118
}
119

120

121

122
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,593,073✔
123
  int32_t code = 0;
2,593,073✔
124
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,593,073✔
125
  if (pMgmt->pData->timeWhiteVer == ver) {
2,593,073✔
126
    if (ver == 0) {
2,592,503✔
127
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,591,507✔
128
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,591,507✔
129
        dError("failed to disable time white list on dnode");
×
130
      }
131
    }
132
    return;
2,592,503✔
133
  }
134
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
570✔
135

136
  SRetrieveWhiteListReq req = {.ver = oldVer};
570✔
137
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
570✔
138
  if (contLen < 0) {
570✔
139
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
140
    return;
×
141
  }
142
  void *pHead = rpcMallocCont(contLen);
570✔
143
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
570✔
144
  if (contLen < 0) {
570✔
145
    rpcFreeCont(pHead);
×
146
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
147
    return;
×
148
  }
149

150
  SRpcMsg rpcMsg = {.pCont = pHead,
570✔
151
                    .contLen = contLen,
152
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
153
                    .info.ahandle = 0,
154
                    .info.notFreeAhandle = 1,
155
                    .info.refId = 0,
156
                    .info.noResp = 0,
157
                    .info.handle = 0};
158
  SEpSet  epset = {0};
570✔
159

160
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
570✔
161

162
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
570✔
163
  if (code != 0) {
570✔
164
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
165
  }
166
}
167

168

169

170
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,593,073✔
171
  int32_t code = 0;
2,593,073✔
172
  int64_t oldVer = taosAnalyGetVersion();
2,593,073✔
173
  if (oldVer == newVer) return;
2,593,073✔
174
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
175

176
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
177
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
178
  if (contLen < 0) {
×
179
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
180
    return;
×
181
  }
182

183
  void *pHead = rpcMallocCont(contLen);
×
184
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
185
  if (contLen < 0) {
×
186
    rpcFreeCont(pHead);
×
187
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
188
    return;
×
189
  }
190

191
  SRpcMsg rpcMsg = {
×
192
      .pCont = pHead,
193
      .contLen = contLen,
194
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
195
      .info.ahandle = 0,
196
      .info.refId = 0,
197
      .info.noResp = 0,
198
      .info.handle = 0,
199
  };
200
  SEpSet epset = {0};
×
201

202
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
203

204
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
205
  if (code != 0) {
×
206
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
207
  }
208
}
209

210
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
48,664,550✔
211
  const STraceId *trace = &pRsp->info.traceId;
48,664,550✔
212
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
48,664,550✔
213

214
  if (pRsp->code != 0) {
48,664,550✔
215
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
537,088✔
216
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
8,516✔
217
             pMgmt->statusSeq);
218
      pMgmt->pData->dropped = 1;
8,516✔
219
      if (dmWriteEps(pMgmt->pData) != 0) {
8,516✔
220
        dError("failed to write dnode file");
×
221
      }
222
      dInfo("dnode will exit since it is in the dropped state");
8,516✔
223
      (void)raise(SIGINT);
8,516✔
224
    }
225
  } else {
226
    SStatusRsp statusRsp = {0};
48,127,462✔
227
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
50,720,535✔
228
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,593,073✔
229
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,593,073✔
230
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
2,439,813✔
231
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
232
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
2,439,813✔
233
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
2,439,813✔
234
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
2,439,813✔
235
      }
236
      setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken, &(statusRsp.auditEpSet), statusRsp.auditVgId);
2,593,073✔
237
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,593,073✔
238
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,593,073✔
239
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,593,073✔
240
    }
241
    tFreeSStatusRsp(&statusRsp);
48,127,462✔
242
  }
243
  rpcFreeCont(pRsp->pCont);
48,664,550✔
244
}
48,664,550✔
245

246
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
48,749,365✔
247
  int32_t    code = 0;
48,749,365✔
248
  SStatusReq req = {0};
48,749,365✔
249
  req.timestamp = taosGetTimestampMs();
48,749,365✔
250
  pMgmt->statusSeq++;
48,749,365✔
251

252
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
48,749,365✔
253
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
48,749,365✔
254
    dError("failed to lock status info lock");
×
UNCOV
255
    return;
×
256
  }
257

258
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
48,749,365✔
259
  req.sver = tsVersion;
48,749,365✔
260
  req.dnodeVer = tsDnodeData.dnodeVer;
48,749,365✔
261
  req.dnodeId = tsDnodeData.dnodeId;
48,749,365✔
262
  req.clusterId = tsDnodeData.clusterId;
48,749,365✔
263
  if (req.clusterId == 0) req.dnodeId = 0;
48,749,365✔
264
  req.rebootTime = tsDnodeData.rebootTime;
48,749,365✔
265
  req.updateTime = tsDnodeData.updateTime;
48,749,365✔
266
  req.numOfCores = tsNumOfCores;
48,749,365✔
267
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
48,749,365✔
268
  req.numOfDiskCfg = tsDiskCfgNum;
48,749,365✔
269
  req.memTotal = tsTotalMemoryKB * 1024;
48,749,365✔
270
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
48,749,365✔
271
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
48,749,365✔
272
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
48,749,365✔
273

274
  req.clusterCfg.statusInterval = tsStatusInterval;
48,749,365✔
275
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
48,749,365✔
276
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
48,749,365✔
277
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
48,749,365✔
278
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
48,749,365✔
279
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
48,749,365✔
280
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
48,749,365✔
281
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
48,749,365✔
282
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
48,749,365✔
283
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
48,749,365✔
284
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
48,749,365✔
285
  req.clusterCfg.checkTime = (int64_t)taosGetLocalTimezoneOffset(&code);
48,749,365✔
286
  if (code != 0) {
48,749,365✔
287
    dError("failed to get local timezone offset, since %s", tstrerror(code));
×
288
    (void)taosThreadMutexUnlock(&pMgmt->pData->statusInfolock);
×
289
    return;
×
290
  }
291

292
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
48,749,365✔
293
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
48,749,365✔
294
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
48,749,365✔
295
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
48,749,365✔
296

297
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
48,749,365✔
298

299
  req.pVloads = tsVinfo.pVloads;
48,749,365✔
300
  tsVinfo.pVloads = NULL;
48,749,365✔
301

302
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
48,749,365✔
303
  req.mload = tsMLoad;
48,749,365✔
304

305
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
48,749,365✔
306
    dError("failed to unlock status info lock");
×
307
    return;
×
308
  }
309

310
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
48,749,365✔
311
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
48,749,365✔
312

313
  req.statusSeq = pMgmt->statusSeq;
48,749,365✔
314
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
48,749,365✔
315
  req.analVer = taosAnalyGetVersion();
48,749,365✔
316
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
48,749,365✔
317

318
  if (tsAuditUseToken) {
48,749,365✔
319
    getAuditDbNameToken(req.auditDB, req.auditToken);
48,744,136✔
320
  }
321

322
  if (tsAuditSaveInSelf) {
48,749,365✔
323
    getAuditEpSet(&req.auditEpSet, &req.auditVgId);
2,988✔
324
  }
325

326
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
48,749,365✔
327
  if (contLen < 0) {
48,749,365✔
328
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
329
    return;
×
330
  }
331

332
  void *pHead = rpcMallocCont(contLen);
48,749,365✔
333
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
48,749,365✔
334
  if (contLen < 0) {
48,749,365✔
335
    rpcFreeCont(pHead);
×
336
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
337
    return;
×
338
  }
339
  tFreeSStatusReq(&req);
48,749,365✔
340

341
  SRpcMsg rpcMsg = {.pCont = pHead,
48,749,365✔
342
                    .contLen = contLen,
343
                    .msgType = TDMT_MND_STATUS,
344
                    .info.ahandle = 0,
345
                    .info.notFreeAhandle = 1,
346
                    .info.refId = 0,
347
                    .info.noResp = 0,
348
                    .info.handle = 0};
349
  SRpcMsg rpcRsp = {0};
48,749,365✔
350

351
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
48,749,365✔
352

353
  SEpSet epSet = {0};
48,749,365✔
354
  int8_t epUpdated = 0;
48,749,365✔
355
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
48,749,365✔
356

357
  if (dDebugFlag & DEBUG_TRACE) {
48,749,365✔
358
    char tbuf[512];
1,641,222✔
359
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
1,642,922✔
360
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
1,642,922✔
361
  }
362
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
48,749,365✔
363
  if (code != 0) {
48,749,365✔
364
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
84,815✔
365
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
84,815✔
366
      dmRotateMnodeEpSet(pMgmt->pData);
84,815✔
367
      char tbuf[512];
84,815✔
368
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
84,815✔
369
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
84,815✔
370
            tbuf, epSet.inUse);
371
    }
372
    return;
84,815✔
373
  }
374

375
  if (rpcRsp.code != 0) {
48,664,550✔
376
    dmRotateMnodeEpSet(pMgmt->pData);
537,088✔
377
    char tbuf[512];
537,088✔
378
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
537,088✔
379
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
537,088✔
380
          tbuf, epSet.inUse);
381
  } else {
382
    if (epUpdated == 1) {
48,127,462✔
383
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
121,315✔
384
    }
385
  }
386
  dmProcessStatusRsp(pMgmt, &rpcRsp);
48,664,550✔
387
}
388

389
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
715,897✔
390
  const STraceId *trace = &pRsp->info.traceId;
715,897✔
391
  int32_t         code = 0;
715,897✔
392
  SConfigRsp      configRsp = {0};
715,897✔
393
  bool            needStop = false;
715,897✔
394

395
  if (pRsp->code != 0) {
715,897✔
396
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
397
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
398
      pMgmt->pData->dropped = 1;
×
399
      if (dmWriteEps(pMgmt->pData) != 0) {
×
400
        dError("failed to write dnode file");
×
401
      }
402
      dInfo("dnode will exit since it is in the dropped state");
×
403
      (void)raise(SIGINT);
×
404
    }
405
  } else {
406
    bool needUpdate = false;
715,897✔
407
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,431,794✔
408
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
715,897✔
409
      // Try to use cfg from mnode sdb.
410
      if (!configRsp.isVersionVerified) {
715,897✔
411
        uInfo("config version not verified, update config");
557,442✔
412
        needUpdate = true;
557,442✔
413
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
557,442✔
414
        if (code != TSDB_CODE_SUCCESS) {
557,442✔
415
          dError("failed to persist global config since %s", tstrerror(code));
900✔
416
          goto _exit;
900✔
417
        }
418
      }
419
    }
420
    if (needUpdate) {
714,997✔
421
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
556,542✔
422
      if (code != TSDB_CODE_SUCCESS) {
556,542✔
423
        dError("failed to update config since %s", tstrerror(code));
×
424
        goto _exit;
×
425
      }
426
      code = setAllConfigs(tsCfg);
556,542✔
427
      if (code != TSDB_CODE_SUCCESS) {
556,542✔
428
        dError("failed to set all configs since %s", tstrerror(code));
800✔
429
        goto _exit;
800✔
430
      }
431
    }
432
    code = taosPersistLocalConfig(pMgmt->path);
714,197✔
433
    if (code != TSDB_CODE_SUCCESS) {
714,197✔
434
      dError("failed to persist local config since %s", tstrerror(code));
×
435
    }
436
    tsConfigInited = 1;
714,197✔
437
  }
438
_exit:
715,897✔
439
  tFreeSConfigRsp(&configRsp);
715,897✔
440
  rpcFreeCont(pRsp->pCont);
715,897✔
441
  if (needStop) {
715,897✔
442
    dmStop();
×
443
  }
444
}
715,897✔
445

446
int32_t dmProcessKeySyncRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
674,056✔
447
  const STraceId *trace = &pRsp->info.traceId;
674,056✔
448
  int32_t         code = 0;
674,056✔
449
  SKeySyncRsp     keySyncRsp = {0};
674,056✔
450

451
  if (pRsp->code != 0) {
674,056✔
452
    dError("failed to sync keys from mnode since %s", tstrerror(pRsp->code));
×
453
    code = pRsp->code;
×
454
    goto _exit;
×
455
  }
456

457
  if (pRsp->pCont == NULL || pRsp->contLen <= 0) {
674,056✔
458
    dError("invalid key sync response, empty content");
×
459
    code = TSDB_CODE_INVALID_MSG;
×
460
    goto _exit;
×
461
  }
462

463
  code = tDeserializeSKeySyncRsp(pRsp->pCont, pRsp->contLen, &keySyncRsp);
674,056✔
464
  if (code != 0) {
674,056✔
465
    dError("failed to deserialize key sync response since %s", tstrerror(code));
×
466
    goto _exit;
×
467
  }
468

469
  dInfo("received key sync response, mnode keyVersion:%d, local keyVersion:%d, needUpdate:%d", keySyncRsp.keyVersion,
674,056✔
470
        tsLocalKeyVersion, keySyncRsp.needUpdate);
471
  tsEncryptKeysStatus = keySyncRsp.encryptionKeyStatus;
674,056✔
472
  if (keySyncRsp.needUpdate) {
674,056✔
473
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
474
    // Get encrypt file path from tsDataDir
475
    char masterKeyFile[PATH_MAX] = {0};
1,668✔
476
    char derivedKeyFile[PATH_MAX] = {0};
1,668✔
477
    snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,668✔
478
             TD_DIRSEP);
479
    snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,668✔
480
             TD_DIRSEP);
481

482
    dInfo("updating local encryption keys from mnode, key file is saved in %s and %s, keyVersion:%d -> %d",
1,668✔
483
          masterKeyFile, derivedKeyFile, tsLocalKeyVersion, keySyncRsp.keyVersion);
484

485
    // Save keys to master.bin and derived.bin
486
    // Use the same algorithm for cfg and meta keys (backward compatible)
487
    code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, keySyncRsp.svrKey, keySyncRsp.dbKey, keySyncRsp.cfgKey, keySyncRsp.metaKey,
1,668✔
488
                                keySyncRsp.dataKey, keySyncRsp.algorithm, keySyncRsp.algorithm, keySyncRsp.algorithm,
489
                                keySyncRsp.keyVersion, keySyncRsp.createTime,
490
                                keySyncRsp.svrKeyUpdateTime, keySyncRsp.dbKeyUpdateTime);
491
    if (code != 0) {
1,668✔
492
      dError("failed to save encryption keys since %s", tstrerror(code));
×
493
      goto _exit;
×
494
    }
495

496
    // Update global variables with synced keys
497
    tstrncpy(tsSvrKey, keySyncRsp.svrKey, sizeof(tsSvrKey));
1,668✔
498
    tstrncpy(tsDbKey, keySyncRsp.dbKey, sizeof(tsDbKey));
1,668✔
499
    tstrncpy(tsCfgKey, keySyncRsp.cfgKey, sizeof(tsCfgKey));
1,668✔
500
    tstrncpy(tsMetaKey, keySyncRsp.metaKey, sizeof(tsMetaKey));
1,668✔
501
    tstrncpy(tsDataKey, keySyncRsp.dataKey, sizeof(tsDataKey));
1,668✔
502
    tsEncryptAlgorithmType = keySyncRsp.algorithm;
1,668✔
503
    tsEncryptKeyVersion = keySyncRsp.keyVersion;
1,668✔
504
    tsEncryptKeyCreateTime = keySyncRsp.createTime;
1,668✔
505
    tsSvrKeyUpdateTime = keySyncRsp.svrKeyUpdateTime;
1,668✔
506
    tsDbKeyUpdateTime = keySyncRsp.dbKeyUpdateTime;
1,668✔
507

508
    // Update local key version
509
    tsLocalKeyVersion = keySyncRsp.keyVersion;
1,668✔
510
    dInfo("successfully updated local encryption keys to version:%d", tsLocalKeyVersion);
1,668✔
511

512
    // Encrypt existing plaintext config files
513
    code = taosEncryptExistingCfgFiles(tsDataDir);
1,668✔
514
    if (code != 0) {
1,668✔
515
      dWarn("failed to encrypt existing config files since %s, will retry on next write", tstrerror(code));
×
516
      // Don't fail the key sync, files will be encrypted on next write
517
      code = 0;
×
518
    }
519
#else
520
    dWarn("enterprise features not enabled, skipping key sync");
521
#endif
522
  } else {
523
    dDebug("local keys are up to date, version:%d", tsLocalKeyVersion);
672,388✔
524
  }
525
  
526
  code = TSDB_CODE_SUCCESS;
674,056✔
527

528
_exit:
674,056✔
529
  rpcFreeCont(pRsp->pCont);
674,056✔
530
  return code;
674,056✔
531
}
532

533
void dmSendKeySyncReq(SDnodeMgmt *pMgmt) {
684,245✔
534
  int32_t     code = 0;
684,245✔
535
  SKeySyncReq req = {0};
684,245✔
536

537
  req.dnodeId = pMgmt->pData->dnodeId;
684,245✔
538
  req.keyVersion = tsLocalKeyVersion;
684,245✔
539
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d", req.dnodeId, req.keyVersion);
684,245✔
540

541
  int32_t contLen = tSerializeSKeySyncReq(NULL, 0, &req);
684,245✔
542
  if (contLen < 0) {
684,245✔
543
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
544
    return;
19✔
545
  }
546

547
  void *pHead = rpcMallocCont(contLen);
684,245✔
548
  if (pHead == NULL) {
684,245✔
549
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
550
    return;
×
551
  }
552
  contLen = tSerializeSKeySyncReq(pHead, contLen, &req);
684,245✔
553
  if (contLen < 0) {
684,245✔
554
    rpcFreeCont(pHead);
×
555
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
556
    return;
×
557
  }
558

559
  SRpcMsg rpcMsg = {.pCont = pHead,
684,245✔
560
                    .contLen = contLen,
561
                    .msgType = TDMT_MND_KEY_SYNC,
562
                    .info.ahandle = 0,
563
                    .info.notFreeAhandle = 1,
564
                    .info.refId = 0,
565
                    .info.noResp = 0,
566
                    .info.handle = 0};
567
  SRpcMsg rpcRsp = {0};
684,245✔
568

569
  SEpSet epSet = {0};
684,245✔
570
  int8_t epUpdated = 0;
684,245✔
571
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
684,245✔
572

573
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d, begin to send rpc msg", req.dnodeId, req.keyVersion);
684,245✔
574
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
684,245✔
575
  if (code != 0) {
684,245✔
576
    dError("failed to SendRecv key sync req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
10,189✔
577
    return;
10,189✔
578
  }
579
  if (rpcRsp.code != 0) {
674,056✔
580
    dError("failed to send key sync req since %s", tstrerror(rpcRsp.code));
×
581
    return;
×
582
  }
583
  code = dmProcessKeySyncRsp(pMgmt, &rpcRsp);
674,056✔
584
  if (code != 0) {
674,056✔
585
    dError("failed to process key sync rsp since %s", tstrerror(code));
×
586
    return;
×
587
  }
588
}
589

590
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
720,987✔
591
  int32_t    code = 0;
720,987✔
592
  SConfigReq req = {0};
720,987✔
593

594
  req.cver = tsdmConfigVersion;
720,987✔
595
  req.forceReadConfig = true;
720,987✔
596
  req.array = taosGetGlobalCfg(tsCfg);
720,987✔
597
  dDebug("send config req to mnode, configVersion:%d", req.cver);
720,987✔
598

599
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
720,987✔
600
  if (contLen < 0) {
720,987✔
601
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
UNCOV
602
    return;
×
603
  }
604

605
  void *pHead = rpcMallocCont(contLen);
720,987✔
606
  if (pHead == NULL) {
720,987✔
607
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
608
    return;
×
609
  }
610
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
720,987✔
611
  if (contLen < 0) {
720,987✔
612
    rpcFreeCont(pHead);
×
613
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
614
    return;
×
615
  }
616

617
  SRpcMsg rpcMsg = {.pCont = pHead,
720,987✔
618
                    .contLen = contLen,
619
                    .msgType = TDMT_MND_CONFIG,
620
                    .info.ahandle = 0,
621
                    .info.notFreeAhandle = 1,
622
                    .info.refId = 0,
623
                    .info.noResp = 0,
624
                    .info.handle = 0};
625
  SRpcMsg rpcRsp = {0};
720,987✔
626

627
  SEpSet epSet = {0};
720,987✔
628
  int8_t epUpdated = 0;
720,987✔
629
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
720,987✔
630

631
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
720,987✔
632
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
720,987✔
633
  if (code != 0) {
720,987✔
634
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
5,090✔
635
    return;
5,090✔
636
  }
637
  if (rpcRsp.code != 0) {
715,897✔
638
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
639
    return;
×
640
  }
641
  dmProcessConfigRsp(pMgmt, &rpcRsp);
715,897✔
642
}
643

644
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
49,280,836✔
645
  dDebug("begin to get dnode info");
49,280,836✔
646
  SDnodeData dnodeData = {0};
49,280,836✔
647
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
49,280,836✔
648
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
49,280,836✔
649
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
49,280,836✔
650
  dnodeData.clusterId = pMgmt->pData->clusterId;
49,280,836✔
651
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
49,280,836✔
652
  dnodeData.updateTime = pMgmt->pData->updateTime;
49,280,836✔
653
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
49,280,836✔
654
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
49,280,836✔
655

656
  dDebug("begin to get vnode loads");
49,280,836✔
657
  SMonVloadInfo vinfo = {0};
49,280,836✔
658
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
49,280,836✔
659

660
  dDebug("begin to get mnode loads");
49,280,836✔
661
  SMonMloadInfo minfo = {0};
49,280,836✔
662
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
49,280,836✔
663

664
  dDebug("begin to lock status info");
49,280,836✔
665
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
49,280,836✔
666
    dError("failed to lock status info lock");
×
667
    return;
×
668
  }
669
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
49,280,836✔
670
  tsDnodeData.dnodeId = dnodeData.dnodeId;
49,280,836✔
671
  tsDnodeData.clusterId = dnodeData.clusterId;
49,280,836✔
672
  tsDnodeData.rebootTime = dnodeData.rebootTime;
49,280,836✔
673
  tsDnodeData.updateTime = dnodeData.updateTime;
49,280,836✔
674
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
49,280,836✔
675

676
  if (tsVinfo.pVloads == NULL) {
49,280,836✔
677
    tsVinfo.pVloads = vinfo.pVloads;
47,902,861✔
678
    vinfo.pVloads = NULL;
47,902,861✔
679
  } else {
680
    taosArrayDestroy(vinfo.pVloads);
1,377,975✔
681
    vinfo.pVloads = NULL;
1,377,975✔
682
  }
683

684
  tsMLoad = minfo.load;
49,280,836✔
685

686
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
49,280,836✔
687
    dError("failed to unlock status info lock");
×
688
    return;
×
689
  }
690
}
691

692
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
693
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
694
  if (contLen < 0) {
×
695
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
696
    return;
×
697
  }
698
  void *pHead = rpcMallocCont(contLen);
×
699
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
700
  if (contLen < 0) {
×
701
    rpcFreeCont(pHead);
×
702
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
703
    return;
×
704
  }
705

706
  SRpcMsg rpcMsg = {.pCont = pHead,
×
707
                    .contLen = contLen,
708
                    .msgType = TDMT_MND_NOTIFY,
709
                    .info.ahandle = 0,
710
                    .info.notFreeAhandle = 1,
711
                    .info.refId = 0,
712
                    .info.noResp = 1,
713
                    .info.handle = 0};
714

715
  SEpSet epSet = {0};
×
716
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
717
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
718
    dError("failed to send notify req");
×
719
  }
720
}
721

722
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
723
  dError("auth rsp is received, but not supported yet");
×
724
  return 0;
×
725
}
726

727
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
728
  dError("grant rsp is received, but not supported yet");
×
729
  return 0;
×
730
}
731

732
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
106,514✔
733
  int32_t       code = 0;
106,514✔
734
  SDCfgDnodeReq cfgReq = {0};
106,514✔
735
  SConfig      *pCfg = taosGetCfg();
106,514✔
736
  SConfigItem  *pItem = NULL;
106,514✔
737

738
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
106,514✔
739
    return TSDB_CODE_INVALID_MSG;
×
740
  }
741
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
106,514✔
742
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
3,763✔
743
  }
744

745
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
102,751✔
746

747
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
102,751✔
748
  if (code != 0) {
102,751✔
749
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
260✔
750
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
260✔
751
      return TSDB_CODE_SUCCESS;
260✔
752
    } else {
753
      return code;
×
754
    }
755
  }
756
  if (pItem == NULL) {
102,491✔
757
    return TSDB_CODE_CFG_NOT_FOUND;
×
758
  }
759

760
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
102,491✔
761
    char value[10] = {0};
×
762
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
763
      tsSyncTimeout = 0;
×
764
    }
765

766
    if (tsSyncTimeout > 0) {
×
767
      SConfigItem *pItemTmp = NULL;
×
768
      char         tmp[10] = {0};
×
769

770
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout);
×
771
      TAOS_CHECK_RETURN(
×
772
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
773
      if (pItemTmp == NULL) {
×
774
        return TSDB_CODE_CFG_NOT_FOUND;
×
775
      }
776

777
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout / 4);
×
778
      TAOS_CHECK_RETURN(
×
779
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
780
      if (pItemTmp == NULL) {
×
781
        return TSDB_CODE_CFG_NOT_FOUND;
×
782
      }
783
      TAOS_CHECK_RETURN(
×
784
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
785
      if (pItemTmp == NULL) {
×
786
        return TSDB_CODE_CFG_NOT_FOUND;
×
787
      }
788

789
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
790
      TAOS_CHECK_RETURN(
×
791
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
792
      if (pItemTmp == NULL) {
×
793
        return TSDB_CODE_CFG_NOT_FOUND;
×
794
      }
795
      TAOS_CHECK_RETURN(
×
796
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
797
      if (pItemTmp == NULL) {
×
798
        return TSDB_CODE_CFG_NOT_FOUND;
×
799
      }
800
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
801
      if (pItemTmp == NULL) {
×
802
        return TSDB_CODE_CFG_NOT_FOUND;
×
803
      }
804

805
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
806
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
807
      if (pItemTmp == NULL) {
×
808
        return TSDB_CODE_CFG_NOT_FOUND;
×
809
      }
810

811
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
812
      TAOS_CHECK_RETURN(
×
813
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
814
      if (pItemTmp == NULL) {
×
815
        return TSDB_CODE_CFG_NOT_FOUND;
×
816
      }
817
      TAOS_CHECK_RETURN(
×
818
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
819
      if (pItemTmp == NULL) {
×
820
        return TSDB_CODE_CFG_NOT_FOUND;
×
821
      }
822
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
823
      if (pItemTmp == NULL) {
×
824
        return TSDB_CODE_CFG_NOT_FOUND;
×
825
      }
826

827
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
828
            tsSyncTimeout);
829
    }
830
  }
831

832
  if (!isConifgItemLazyMode(pItem)) {
102,491✔
833
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
101,776✔
834

835
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
101,776✔
836
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
837
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
838
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
839

840
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
841
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
842
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
843
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
844

845
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
846
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
847
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
848

849
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
850
            tsSyncTimeout);
851
    }
852
  }
853

854
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
102,491✔
855
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
17,396✔
856
    if (code != TSDB_CODE_SUCCESS) {
17,396✔
857
      dError("failed to persist global config since %s", tstrerror(code));
×
858
    }
859
  } else {
860
    code = taosPersistLocalConfig(pMgmt->path);
85,095✔
861
    if (code != TSDB_CODE_SUCCESS) {
85,095✔
862
      dError("failed to persist local config since %s", tstrerror(code));
×
863
    }
864
  }
865

866
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
102,491✔
867
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
868

869
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
870
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
871
  }
872

873
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
102,491✔
874
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
102,491✔
875
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
876
  }
877

878
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
102,491✔
879
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
102,491✔
880
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
881
  }
882

883
  if (cfgReq.version > 0) {
102,491✔
884
    tsdmConfigVersion = cfgReq.version;
25,738✔
885
  }
886
  return code;
102,491✔
887
}
888

889
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
207✔
890
#ifdef TD_ENTERPRISE
891
  int32_t       code = 0;
207✔
892
  SDCfgDnodeReq cfgReq = {0};
207✔
893
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
207✔
894
    code = TSDB_CODE_INVALID_MSG;
×
895
    goto _exit;
×
896
  }
897

898
  code = dmUpdateEncryptKey(cfgReq.value, true);
207✔
899
  if (code == 0) {
207✔
900
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
207✔
901
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
207✔
902
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
207✔
903
  }
904

905
_exit:
207✔
906
  pMsg->code = code;
207✔
907
  pMsg->info.rsp = NULL;
207✔
908
  pMsg->info.rspLen = 0;
207✔
909
  return code;
207✔
910
#else
911
  return 0;
912
#endif
913
}
914

915
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
916
// Verification plaintext used to validate encryption keys
917
#define KEY_VERIFY_PLAINTEXT "TDengine_Encryption_Key_Verification_v1.0"
918

919
// Save key verification file with encrypted plaintext for each key
920
static int32_t dmSaveKeyVerification(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
2,651✔
921
                                     const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
922
                                     int32_t metaAlgorithm) {
923
  char    verifyFile[PATH_MAX] = {0};
2,651✔
924
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
2,651✔
925
                            TD_DIRSEP, TD_DIRSEP);
926
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
2,651✔
927
    dError("failed to build key verification file path");
×
928
    return TSDB_CODE_OUT_OF_BUFFER;
×
929
  }
930

931
  int32_t     code = 0;
2,651✔
932
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
2,651✔
933
  int32_t     plaintextLen = strlen(plaintext);
2,651✔
934

935
  // Array of keys and their algorithms
936
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
2,651✔
937
  int32_t     algorithms[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
2,651✔
938
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
2,651✔
939

940
  // Calculate total buffer size
941
  int32_t encryptedLen = ((plaintextLen + 15) / 16) * 16;                 // Padded length for CBC
2,651✔
942
  int32_t headerSize = sizeof(uint32_t) + sizeof(uint16_t);               // magic + version
2,651✔
943
  int32_t perKeySize = sizeof(int32_t) + sizeof(int32_t) + encryptedLen;  // algo + len + encrypted data
2,651✔
944
  int32_t totalSize = headerSize + perKeySize * 5;
2,651✔
945

946
  // Allocate buffer for all data
947
  char *buffer = taosMemoryMalloc(totalSize);
2,651✔
948
  if (buffer == NULL) {
2,651✔
949
    dError("failed to allocate memory for key verification buffer");
×
950
    return terrno;
×
951
  }
952

953
  char *ptr = buffer;
2,651✔
954

955
  // Write magic number and version to buffer
956
  uint32_t magic = 0x544B5659;  // "TKVY" in hex
2,651✔
957
  uint16_t version = 1;
2,651✔
958
  memcpy(ptr, &magic, sizeof(magic));
2,651✔
959
  ptr += sizeof(magic);
2,651✔
960
  memcpy(ptr, &version, sizeof(version));
2,651✔
961
  ptr += sizeof(version);
2,651✔
962

963
  // Encrypt all keys and write to buffer
964
  char paddedPlaintext[512] = {0};
2,651✔
965
  memcpy(paddedPlaintext, plaintext, plaintextLen);
2,651✔
966

967
  for (int i = 0; i < 5; i++) {
15,906✔
968
    char encrypted[512] = {0};
13,255✔
969

970
    // Encrypt the verification plaintext with this key using CBC
971
    SCryptOpts opts = {0};
13,255✔
972
    opts.len = encryptedLen;
13,255✔
973
    opts.source = paddedPlaintext;
13,255✔
974
    opts.result = encrypted;
13,255✔
975
    opts.unitLen = 16;
13,255✔
976
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
13,255✔
977
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
13,255✔
978

979
    int32_t count = CBC_Encrypt(&opts);
13,255✔
980
    if (count != opts.len) {
13,255✔
981
      code = terrno ? terrno : TSDB_CODE_FAILED;
×
982
      dError("failed to encrypt verification for %s, count=%d, expected=%d, since %s", keyNames[i], count, opts.len,
×
983
             tstrerror(code));
984
      taosMemoryFree(buffer);
×
985
      return code;
×
986
    }
987

988
    // Write to buffer: algorithm + encrypted length + encrypted data
989
    memcpy(ptr, &algorithms[i], sizeof(int32_t));
13,255✔
990
    ptr += sizeof(int32_t);
13,255✔
991
    memcpy(ptr, &encryptedLen, sizeof(int32_t));
13,255✔
992
    ptr += sizeof(int32_t);
13,255✔
993
    memcpy(ptr, encrypted, encryptedLen);
13,255✔
994
    ptr += encryptedLen;
13,255✔
995

996
    dDebug("prepared verification for %s: algorithm=%d, encLen=%d", keyNames[i], algorithms[i], encryptedLen);
13,255✔
997
  }
998

999
  // Write all data to file in one operation
1000
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
2,651✔
1001
  if (pFile == NULL) {
2,651✔
1002
    dError("failed to create key verification file:%s, errno:%d", verifyFile, errno);
×
1003
    taosMemoryFree(buffer);
×
1004
    return TSDB_CODE_FILE_CORRUPTED;
×
1005
  }
1006

1007
  int64_t written = taosWriteFile(pFile, buffer, totalSize);
2,651✔
1008
  (void)taosCloseFile(&pFile);
2,651✔
1009
  taosMemoryFree(buffer);
2,651✔
1010

1011
  if (written != totalSize) {
2,651✔
1012
    dError("failed to write key verification file, written=%" PRId64 ", expected=%d", written, totalSize);
×
1013
    return TSDB_CODE_FILE_CORRUPTED;
×
1014
  }
1015

1016
  dInfo("successfully saved key verification file:%s, size=%d", verifyFile, totalSize);
2,651✔
1017
  return 0;
2,651✔
1018
}
1019

1020
// Verify all encryption keys by decrypting and comparing with original plaintext
1021
static int32_t dmVerifyEncryptionKeys(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
2,317✔
1022
                                      const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
1023
                                      int32_t metaAlgorithm) {
1024
  char    verifyFile[PATH_MAX] = {0};
2,317✔
1025
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
2,317✔
1026
                            TD_DIRSEP, TD_DIRSEP);
1027
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
2,317✔
1028
    dError("failed to build key verification file path");
×
1029
    return TSDB_CODE_OUT_OF_BUFFER;
×
1030
  }
1031

1032
  // Get file size
1033
  int64_t fileSize = 0;
2,317✔
1034
  if (taosStatFile(verifyFile, &fileSize, NULL, NULL) < 0) {
2,317✔
1035
    // File doesn't exist, create it with current keys
1036
    dInfo("key verification file not found, creating new one");
2,317✔
1037
    return dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
2,317✔
1038
  }
1039

1040
  if (fileSize <= 0 || fileSize > 10240) {  // Max 10KB
×
1041
    dError("invalid key verification file size: %" PRId64, fileSize);
×
1042
    return TSDB_CODE_FILE_CORRUPTED;
×
1043
  }
1044

1045
  // Allocate buffer and read entire file
1046
  char *buffer = taosMemoryMalloc(fileSize);
×
1047
  if (buffer == NULL) {
×
1048
    dError("failed to allocate memory for key verification buffer");
×
1049
    return terrno;
×
1050
  }
1051

1052
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_READ);
×
1053
  if (pFile == NULL) {
×
1054
    dError("failed to open key verification file:%s", verifyFile);
×
1055
    taosMemoryFree(buffer);
×
1056
    return TSDB_CODE_FILE_CORRUPTED;
×
1057
  }
1058

1059
  int64_t bytesRead = taosReadFile(pFile, buffer, fileSize);
×
1060
  (void)taosCloseFile(&pFile);
×
1061

1062
  if (bytesRead != fileSize) {
×
1063
    dError("failed to read key verification file, read=%" PRId64 ", expected=%" PRId64, bytesRead, fileSize);
×
1064
    taosMemoryFree(buffer);
×
1065
    return TSDB_CODE_FILE_CORRUPTED;
×
1066
  }
1067

1068
  int32_t     code = 0;
×
1069
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
×
1070
  int32_t     plaintextLen = strlen(plaintext);
×
1071
  const char *ptr = buffer;
×
1072

1073
  // Parse and verify header
1074
  uint32_t magic = 0;
×
1075
  uint16_t version = 0;
×
1076
  memcpy(&magic, ptr, sizeof(magic));
×
1077
  ptr += sizeof(magic);
×
1078
  memcpy(&version, ptr, sizeof(version));
×
1079
  ptr += sizeof(version);
×
1080

1081
  if (magic != 0x544B5659) {
×
1082
    dError("invalid magic number in key verification file: 0x%x", magic);
×
1083
    taosMemoryFree(buffer);
×
1084
    return TSDB_CODE_FILE_CORRUPTED;
×
1085
  }
1086

1087
  // Array of keys and their algorithms
1088
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
×
1089
  int32_t     expectedAlgos[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
×
1090
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
×
1091

1092
  // Verify each key from buffer
1093
  for (int i = 0; i < 5; i++) {
×
1094
    // Check if we have enough data remaining
1095
    if (ptr - buffer + sizeof(int32_t) * 2 > fileSize) {
×
1096
      dError("unexpected end of file while reading %s metadata", keyNames[i]);
×
1097
      taosMemoryFree(buffer);
×
1098
      return TSDB_CODE_FILE_CORRUPTED;
×
1099
    }
1100

1101
    int32_t savedAlgo = 0;
×
1102
    int32_t encryptedLen = 0;
×
1103

1104
    memcpy(&savedAlgo, ptr, sizeof(int32_t));
×
1105
    ptr += sizeof(int32_t);
×
1106
    memcpy(&encryptedLen, ptr, sizeof(int32_t));
×
1107
    ptr += sizeof(int32_t);
×
1108

1109
    if (encryptedLen <= 0 || encryptedLen > 512) {
×
1110
      dError("invalid encrypted length %d for %s", encryptedLen, keyNames[i]);
×
1111
      taosMemoryFree(buffer);
×
1112
      return TSDB_CODE_FILE_CORRUPTED;
×
1113
    }
1114

1115
    if (ptr - buffer + encryptedLen > fileSize) {
×
1116
      dError("unexpected end of file while reading %s encrypted data", keyNames[i]);
×
1117
      taosMemoryFree(buffer);
×
1118
      return TSDB_CODE_FILE_CORRUPTED;
×
1119
    }
1120

1121
    uint8_t encrypted[512] = {0};
×
1122
    memcpy(encrypted, ptr, encryptedLen);
×
1123
    ptr += encryptedLen;
×
1124

1125
    // Decrypt with current key using CBC
1126
    char decrypted[512] = {0};
×
1127

1128
    SCryptOpts opts = {0};
×
1129
    opts.len = encryptedLen;
×
1130
    opts.source = (char *)encrypted;
×
1131
    opts.result = decrypted;
×
1132
    opts.unitLen = 16;
×
1133
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
×
1134
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
×
1135

1136
    int32_t count = CBC_Decrypt(&opts);
×
1137
    if (count != opts.len) {
×
1138
      code = terrno ? terrno : TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1139
      dError("failed to decrypt verification for %s, count=%d, expected=%d, since %s - KEY IS INCORRECT", keyNames[i],
×
1140
             count, opts.len, tstrerror(code));
1141
      taosMemoryFree(buffer);
×
1142
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1143
    }
1144

1145
    // Verify decrypted data matches original plaintext (compare only the plaintext length)
1146
    if (memcmp(decrypted, plaintext, plaintextLen) != 0) {
×
1147
      dError("%s verification FAILED: decrypted text does not match - KEY IS INCORRECT", keyNames[i]);
×
1148
      taosMemoryFree(buffer);
×
1149
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1150
    }
1151

1152
    dInfo("%s verification passed (algorithm=%d)", keyNames[i], savedAlgo);
×
1153
  }
1154

1155
  taosMemoryFree(buffer);
×
1156
  dInfo("all encryption keys verified successfully");
×
1157
  return 0;
×
1158
}
1159

1160
// Public API: Verify and initialize encryption keys at startup
1161
int32_t dmVerifyAndInitEncryptionKeys(void) {
718,372✔
1162
  // Skip verification in dump sdb mode (taosd -s)
1163
  if (tsSkipKeyCheckMode) {
718,372✔
1164
    dInfo("skip encryption key verification in some special check mode");
800✔
1165
    return 0;
800✔
1166
  }
1167

1168
  // Check if encryption keys are loaded
1169
  if (tsEncryptKeysStatus != TSDB_ENCRYPT_KEY_STAT_LOADED) {
717,572✔
1170
    dDebug("encryption keys not loaded, skipping verification");
715,255✔
1171
    return 0;
715,255✔
1172
  }
1173

1174
  // Get key file paths
1175
  char    masterKeyFile[PATH_MAX] = {0};
2,317✔
1176
  char    derivedKeyFile[PATH_MAX] = {0};
2,317✔
1177
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
2,317✔
1178
                            TD_DIRSEP, TD_DIRSEP);
1179
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
2,317✔
1180
    dError("failed to build master key file path");
×
1181
    return TSDB_CODE_OUT_OF_BUFFER;
×
1182
  }
1183

1184
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
2,317✔
1185
                    TD_DIRSEP, TD_DIRSEP);
1186
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
2,317✔
1187
    dError("failed to build derived key file path");
×
1188
    return TSDB_CODE_OUT_OF_BUFFER;
×
1189
  }
1190

1191
  // Load encryption keys
1192
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
2,317✔
1193
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
2,317✔
1194
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
2,317✔
1195
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
2,317✔
1196
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
2,317✔
1197
  int32_t algorithm = 0;
2,317✔
1198
  int32_t cfgAlgorithm = 0;
2,317✔
1199
  int32_t metaAlgorithm = 0;
2,317✔
1200
  int32_t fileVersion = 0;
2,317✔
1201
  int32_t keyVersion = 0;
2,317✔
1202
  int64_t createTime = 0;
2,317✔
1203
  int64_t svrKeyUpdateTime = 0;
2,317✔
1204
  int64_t dbKeyUpdateTime = 0;
2,317✔
1205

1206
  int32_t code = taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey,
2,317✔
1207
                                      &algorithm, &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime,
1208
                                      &svrKeyUpdateTime, &dbKeyUpdateTime);
1209
  if (code != 0) {
2,317✔
1210
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1211
    return code;
×
1212
  }
1213

1214
  // Verify all keys
1215
  code = dmVerifyEncryptionKeys(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
2,317✔
1216
  if (code != 0) {
2,317✔
1217
    dError("encryption key verification failed, since %s", tstrerror(code));
×
1218
    return code;
×
1219
  }
1220

1221
  dInfo("encryption keys verified and initialized successfully");
2,317✔
1222
  return 0;
2,317✔
1223
}
1224
#else
1225
int32_t dmVerifyAndInitEncryptionKeys(void) {
1226
  // Community edition or no TaosK support
1227
  return 0;
1228
}
1229
#endif
1230

1231
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1232
static int32_t dmUpdateSvrKey(const char *newKey) {
167✔
1233
  if (newKey == NULL || newKey[0] == '\0') {
167✔
1234
    dError("invalid new SVR_KEY, key is empty");
×
1235
    return TSDB_CODE_INVALID_PARA;
×
1236
  }
1237

1238
  char masterKeyFile[PATH_MAX] = {0};
167✔
1239
  char derivedKeyFile[PATH_MAX] = {0};
167✔
1240

1241
  // Build path to key files
1242
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
167✔
1243
                            TD_DIRSEP, TD_DIRSEP);
1244
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
167✔
1245
    dError("failed to build master key file path");
×
1246
    return TSDB_CODE_OUT_OF_BUFFER;
×
1247
  }
1248

1249
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
167✔
1250
                    TD_DIRSEP, TD_DIRSEP);
1251
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
167✔
1252
    dError("failed to build derived key file path");
×
1253
    return TSDB_CODE_OUT_OF_BUFFER;
×
1254
  }
1255

1256
  // Load current keys
1257
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1258
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1259
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1260
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1261
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1262
  int32_t algorithm = 0;
167✔
1263
  int32_t cfgAlgorithm = 0;
167✔
1264
  int32_t metaAlgorithm = 0;
167✔
1265
  int32_t fileVersion = 0;
167✔
1266
  int32_t keyVersion = 0;
167✔
1267
  int64_t createTime = 0;
167✔
1268
  int64_t svrKeyUpdateTime = 0;
167✔
1269
  int64_t dbKeyUpdateTime = 0;
167✔
1270

1271
  int32_t code =
1272
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
167✔
1273
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1274
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1275
  if (code != 0) {
167✔
1276
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1277
    return code;
×
1278
  }
1279

1280
  // Update SVR_KEY
1281
  int64_t now = taosGetTimestampMs();
167✔
1282
  int32_t newKeyVersion = keyVersion + 1;
167✔
1283

1284
  dInfo("updating SVR_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
167✔
1285
  tstrncpy(svrKey, newKey, sizeof(svrKey));
167✔
1286
  svrKeyUpdateTime = now;
167✔
1287

1288
  // Save updated keys (use algorithm for all keys for backward compatibility)
1289
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
167✔
1290
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1291
  if (code != 0) {
167✔
1292
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1293
    return code;
×
1294
  }
1295

1296
  // Update key verification file with new SVR_KEY
1297
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
167✔
1298
  if (code != 0) {
167✔
1299
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1300
    // Don't fail the operation if verification file update fails
1301
  }
1302

1303
  // Update global variables
1304
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
167✔
1305
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
167✔
1306
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
167✔
1307
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
167✔
1308
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
167✔
1309
  tsEncryptAlgorithmType = algorithm;
167✔
1310
  tsEncryptFileVersion = fileVersion;
167✔
1311
  tsEncryptKeyVersion = newKeyVersion;
167✔
1312
  tsEncryptKeyCreateTime = createTime;
167✔
1313
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
167✔
1314
  tsDbKeyUpdateTime = dbKeyUpdateTime;
167✔
1315

1316
  // Update encryption key status for backward compatibility
1317
  int keyLen = strlen(tsDataKey);
167✔
1318
  if (keyLen > ENCRYPT_KEY_LEN) {
167✔
1319
    keyLen = ENCRYPT_KEY_LEN;
×
1320
  }
1321
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
167✔
1322
  memcpy(tsEncryptKey, tsDataKey, keyLen);
167✔
1323
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
167✔
1324
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
167✔
1325

1326
  dInfo("successfully updated SVR_KEY to version:%d", newKeyVersion);
167✔
1327
  return 0;
167✔
1328
}
1329

1330
static int32_t dmUpdateKeyExpiration(int32_t days, const char *strategy) {
×
1331
  if (days < 0) {
×
1332
    dError("invalid days value:%d, must be >= 0", days);
×
1333
    return TSDB_CODE_INVALID_PARA;
×
1334
  }
1335

1336
  if (strategy == NULL || strategy[0] == '\0') {
×
1337
    dError("invalid strategy, strategy is empty");
×
1338
    return TSDB_CODE_INVALID_PARA;
×
1339
  }
1340

1341
  // Validate strategy value
1342
  if (strcmp(strategy, "ALARM") != 0) {
×
1343
    dWarn("unknown strategy:%s, supported values: ALARM. Will use it anyway.", strategy);
×
1344
  }
1345

1346
  // Update global variables directly
1347
  tsKeyExpirationDays = days;
×
1348
  tstrncpy(tsKeyExpirationStrategy, strategy, sizeof(tsKeyExpirationStrategy));
×
1349

1350
  dInfo("successfully updated key expiration config: days=%d, strategy=%s", days, strategy);
×
1351
  return 0;
×
1352
}
1353

1354
static int32_t dmUpdateDbKey(const char *newKey) {
167✔
1355
  if (newKey == NULL || newKey[0] == '\0') {
167✔
1356
    dError("invalid new DB_KEY, key is empty");
×
1357
    return TSDB_CODE_INVALID_PARA;
×
1358
  }
1359

1360
  char masterKeyFile[PATH_MAX] = {0};
167✔
1361
  char derivedKeyFile[PATH_MAX] = {0};
167✔
1362

1363
  // Build path to key files
1364
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
167✔
1365
                            TD_DIRSEP, TD_DIRSEP);
1366
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
167✔
1367
    dError("failed to build master key file path");
×
1368
    return TSDB_CODE_OUT_OF_BUFFER;
×
1369
  }
1370

1371
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
167✔
1372
                    TD_DIRSEP, TD_DIRSEP);
1373
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
167✔
1374
    dError("failed to build derived key file path");
×
1375
    return TSDB_CODE_OUT_OF_BUFFER;
×
1376
  }
1377

1378
  // Load current keys
1379
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1380
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1381
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1382
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1383
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
167✔
1384
  int32_t algorithm = 0;
167✔
1385
  int32_t cfgAlgorithm = 0;
167✔
1386
  int32_t metaAlgorithm = 0;
167✔
1387
  int32_t fileVersion = 0;
167✔
1388
  int32_t keyVersion = 0;
167✔
1389
  int64_t createTime = 0;
167✔
1390
  int64_t svrKeyUpdateTime = 0;
167✔
1391
  int64_t dbKeyUpdateTime = 0;
167✔
1392

1393
  int32_t code =
1394
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
167✔
1395
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1396
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1397
  if (code != 0) {
167✔
1398
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1399
    return code;
×
1400
  }
1401

1402
  // Update DB_KEY
1403
  int64_t now = taosGetTimestampMs();
167✔
1404
  int32_t newKeyVersion = keyVersion + 1;
167✔
1405

1406
  dInfo("updating DB_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
167✔
1407
  tstrncpy(dbKey, newKey, sizeof(dbKey));
167✔
1408
  dbKeyUpdateTime = now;
167✔
1409

1410
  // Save updated keys (use algorithm for all keys for backward compatibility)
1411
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
167✔
1412
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1413
  if (code != 0) {
167✔
1414
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1415
    return code;
×
1416
  }
1417

1418
  // Update key verification file with new DB_KEY
1419
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
167✔
1420
  if (code != 0) {
167✔
1421
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1422
    // Don't fail the operation if verification file update fails
1423
  }
1424

1425
  // Update global variables
1426
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
167✔
1427
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
167✔
1428
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
167✔
1429
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
167✔
1430
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
167✔
1431
  tsEncryptAlgorithmType = algorithm;
167✔
1432
  tsEncryptFileVersion = fileVersion;
167✔
1433
  tsEncryptKeyVersion = newKeyVersion;
167✔
1434
  tsEncryptKeyCreateTime = createTime;
167✔
1435
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
167✔
1436
  tsDbKeyUpdateTime = dbKeyUpdateTime;
167✔
1437

1438
  // Update encryption key status for backward compatibility
1439
  int keyLen = strlen(tsDataKey);
167✔
1440
  if (keyLen > ENCRYPT_KEY_LEN) {
167✔
1441
    keyLen = ENCRYPT_KEY_LEN;
×
1442
  }
1443
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
167✔
1444
  memcpy(tsEncryptKey, tsDataKey, keyLen);
167✔
1445
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
167✔
1446
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
167✔
1447

1448
  dInfo("successfully updated DB_KEY to version:%d", newKeyVersion);
167✔
1449
  return 0;
167✔
1450
}
1451
#endif
1452

1453
int32_t dmProcessAlterEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
334✔
1454
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1455
  int32_t              code = 0;
334✔
1456
  SMAlterEncryptKeyReq alterKeyReq = {0};
334✔
1457
  if (tDeserializeSMAlterEncryptKeyReq(pMsg->pCont, pMsg->contLen, &alterKeyReq) != 0) {
334✔
1458
    code = TSDB_CODE_INVALID_MSG;
×
1459
    dError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
1460
    goto _exit;
×
1461
  }
1462

1463
  dInfo("received alter encrypt key req, keyType:%d", alterKeyReq.keyType);
334✔
1464

1465
  // Update the specified key (svr_key or db_key)
1466
  if (alterKeyReq.keyType == 0) {
334✔
1467
    // Update SVR_KEY
1468
    code = dmUpdateSvrKey(alterKeyReq.newKey);
167✔
1469
    if (code == 0) {
167✔
1470
      dInfo("successfully updated SVR_KEY");
167✔
1471
    } else {
1472
      dError("failed to update SVR_KEY, since %s", tstrerror(code));
×
1473
    }
1474
  } else if (alterKeyReq.keyType == 1) {
167✔
1475
    // Update DB_KEY
1476
    code = dmUpdateDbKey(alterKeyReq.newKey);
167✔
1477
    if (code == 0) {
167✔
1478
      dInfo("successfully updated DB_KEY");
167✔
1479
    } else {
1480
      dError("failed to update DB_KEY, since %s", tstrerror(code));
×
1481
    }
1482
  } else {
1483
    dError("invalid keyType:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", alterKeyReq.keyType);
×
1484
    code = TSDB_CODE_INVALID_PARA;
×
1485
  }
1486

1487
_exit:
334✔
1488
  tFreeSMAlterEncryptKeyReq(&alterKeyReq);
334✔
1489
  pMsg->code = code;
334✔
1490
  pMsg->info.rsp = NULL;
334✔
1491
  pMsg->info.rspLen = 0;
334✔
1492
  return code;
334✔
1493
#else
1494
  dError("encryption key management is only available in enterprise edition");
1495
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1496
  pMsg->info.rsp = NULL;
1497
  pMsg->info.rspLen = 0;
1498
  return TSDB_CODE_OPS_NOT_SUPPORT;
1499
#endif
1500
}
1501

1502
int32_t dmProcessAlterKeyExpirationReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1503
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1504
  int32_t                 code = 0;
×
1505
  SMAlterKeyExpirationReq alterReq = {0};
×
1506
  if (tDeserializeSMAlterKeyExpirationReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
×
1507
    code = TSDB_CODE_INVALID_MSG;
×
1508
    dError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
1509
    goto _exit;
×
1510
  }
1511

1512
  dInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
1513

1514
  // Update key expiration configuration
1515
  code = dmUpdateKeyExpiration(alterReq.days, alterReq.strategy);
×
1516
  if (code == 0) {
×
1517
    dInfo("successfully updated key expiration: %d days, strategy: %s", alterReq.days, alterReq.strategy);
×
1518
  } else {
1519
    dError("failed to update key expiration, since %s", tstrerror(code));
×
1520
  }
1521

1522
_exit:
×
1523
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
1524
  pMsg->code = code;
×
1525
  pMsg->info.rsp = NULL;
×
1526
  pMsg->info.rspLen = 0;
×
1527
  return code;
×
1528
#else
1529
  dError("key expiration management is only available in enterprise edition");
1530
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1531
  pMsg->info.rsp = NULL;
1532
  pMsg->info.rspLen = 0;
1533
  return TSDB_CODE_OPS_NOT_SUPPORT;
1534
#endif
1535
}
1536

1537
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1538
  int32_t code = 0;
×
1539
  int32_t lino = 0;
×
1540
  SMsgCb *msgCb = &pMgmt->msgCb;
×
1541
  void *pTransCli = msgCb->clientRpc;
×
1542
  void *pTransStatus = msgCb->statusRpc;  
×
1543
  void *pTransSync = msgCb->syncRpc; 
×
1544
  void *pTransServer = msgCb->serverRpc;
×
1545

1546
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
1547
  if (code != 0) {
×
1548
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
1549
    goto _error;
×
1550
  }
1551

1552
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
1553
  if (code != 0) {
×
1554
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
1555
    goto _error;
×
1556
  }
1557

1558
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
1559
  if (code != 0) {
×
1560
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
1561
    goto _error;
×
1562
  }
1563

1564
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
1565
  if (code != 0) {
×
1566
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
1567
    goto _error;
×
1568
  }
1569

1570
_error:
×
1571
  
1572
  return code;
×
1573
}
1574

1575
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
260✔
1576
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
260✔
1577
  pStatus->details[0] = 0;
260✔
1578

1579
  SMonMloadInfo minfo = {0};
260✔
1580
  (*pMgmt->getMnodeLoadsFp)(&minfo);
260✔
1581
  if (minfo.isMnode &&
260✔
1582
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
260✔
1583
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1584
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
1585
    return;
×
1586
  }
1587

1588
  SMonVloadInfo vinfo = {0};
260✔
1589
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
260✔
1590
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
780✔
1591
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
520✔
1592
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
520✔
1593
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1594
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
1595
               syncStr(pLoad->syncState));
×
1596
      break;
×
1597
    }
1598
  }
1599

1600
  taosArrayDestroy(vinfo.pVloads);
260✔
1601
}
1602

1603
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
260✔
1604
  int32_t code = 0;
260✔
1605
  dDebug("server run status req is received");
260✔
1606
  SServerStatusRsp statusRsp = {0};
260✔
1607
  dmGetServerRunStatus(pMgmt, &statusRsp);
260✔
1608

1609
  pMsg->info.rsp = NULL;
260✔
1610
  pMsg->info.rspLen = 0;
260✔
1611

1612
  SRpcMsg rspMsg = {.info = pMsg->info};
260✔
1613
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
260✔
1614
  if (rspLen < 0) {
260✔
1615
    return TSDB_CODE_OUT_OF_MEMORY;
×
1616
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1617
    // return rspMsg.code;
1618
  }
1619

1620
  void *pRsp = rpcMallocCont(rspLen);
260✔
1621
  if (pRsp == NULL) {
260✔
1622
    return terrno;
×
1623
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1624
    // return rspMsg.code;
1625
  }
1626

1627
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
260✔
1628
  if (rspLen < 0) {
260✔
1629
    return TSDB_CODE_INVALID_MSG;
×
1630
  }
1631

1632
  pMsg->info.rsp = pRsp;
260✔
1633
  pMsg->info.rspLen = rspLen;
260✔
1634
  return 0;
260✔
1635
}
1636

1637
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
31,443✔
1638
  int32_t code = 0;
31,443✔
1639

1640
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
31,443✔
1641
  if (pBlock == NULL) {
31,443✔
1642
    return terrno;
×
1643
  }
1644

1645
  size_t size = 0;
31,443✔
1646

1647
  const SSysTableMeta *pMeta = NULL;
31,443✔
1648
  getInfosDbMeta(&pMeta, &size);
31,443✔
1649

1650
  int32_t index = 0;
31,443✔
1651
  for (int32_t i = 0; i < size; ++i) {
660,303✔
1652
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
660,303✔
1653
      index = i;
31,443✔
1654
      break;
31,443✔
1655
    }
1656
  }
1657

1658
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
31,443✔
1659
  if (pBlock->pDataBlock == NULL) {
31,443✔
1660
    code = terrno;
×
1661
    goto _exit;
×
1662
  }
1663

1664
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
220,101✔
1665
    SColumnInfoData colInfoData = {0};
188,658✔
1666
    colInfoData.info.colId = i + 1;
188,658✔
1667
    colInfoData.info.type = pMeta[index].schema[i].type;
188,658✔
1668
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
188,658✔
1669
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
377,316✔
1670
      code = terrno;
×
1671
      goto _exit;
×
1672
    }
1673
  }
1674

1675
  pBlock->info.hasVarCol = true;
31,443✔
1676
_exit:
31,443✔
1677
  if (code != 0) {
31,443✔
1678
    blockDataDestroy(pBlock);
×
1679
  } else {
1680
    *ppBlock = pBlock;
31,443✔
1681
  }
1682
  return code;
31,443✔
1683
}
1684

1685
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
31,443✔
1686
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL, SHOW_VAR_PRIV_ALL);
31,443✔
1687
  if (code != 0) {
31,443✔
1688
    return code;
×
1689
  }
1690

1691
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
31,443✔
1692
  if (pColInfo == NULL) {
31,443✔
1693
    return TSDB_CODE_OUT_OF_RANGE;
×
1694
  }
1695

1696
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
31,443✔
1697
}
1698

1699
static int32_t dmBuildCpuAllocationBlock(SSDataBlock **ppBlock) {
1,927✔
1700
  int32_t      code = 0;
1,927✔
1701
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
1,927✔
1702
  if (pBlock == NULL) return terrno;
1,927✔
1703

1704
  size_t               size = 0;
1,927✔
1705
  const SSysTableMeta *pMeta = NULL;
1,927✔
1706
  getInfosDbMeta(&pMeta, &size);
1,927✔
1707

1708
  int32_t index = -1;
1,927✔
1709
  for (int32_t i = 0; i < (int32_t)size; ++i) {
42,394✔
1710
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_CPU_ALLOCATION) == 0) {
42,394✔
1711
      index = i;
1,927✔
1712
      break;
1,927✔
1713
    }
1714
  }
1715
  if (index < 0) {
1,927✔
1716
    taosMemoryFree(pBlock);
×
1717
    return TSDB_CODE_INTERNAL_ERROR;
×
1718
  }
1719

1720
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
1,927✔
1721
  if (pBlock->pDataBlock == NULL) {
1,927✔
1722
    code = terrno;
×
1723
    goto _exit_cpu;
×
1724
  }
1725

1726
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
11,562✔
1727
    SColumnInfoData colInfoData = {0};
9,635✔
1728
    colInfoData.info.colId = i + 1;
9,635✔
1729
    colInfoData.info.type = pMeta[index].schema[i].type;
9,635✔
1730
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
9,635✔
1731
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
19,270✔
1732
      code = terrno;
×
1733
      goto _exit_cpu;
×
1734
    }
1735
  }
1736

1737
  pBlock->info.hasVarCol = true;
1,927✔
1738
_exit_cpu:
1,927✔
1739
  if (code != 0) {
1,927✔
1740
    blockDataDestroy(pBlock);
×
1741
  } else {
1742
    *ppBlock = pBlock;
1,927✔
1743
  }
1744
  return code;
1,927✔
1745
}
1746

1747
static int32_t dmAppendCpuAllocationRow(SSDataBlock *pBlock, int32_t dnodeId, const char *category, int32_t cores,
5,781✔
1748
                                        const char *coreIds, bool enabled) {
1749
  int32_t code = 0;
5,781✔
1750
  int32_t row = pBlock->info.rows;
5,781✔
1751

1752
  // column 0: dnode_id (INT)
1753
  SColumnInfoData *pCol0 = taosArrayGet(pBlock->pDataBlock, 0);
5,781✔
1754
  if (pCol0 == NULL) return TSDB_CODE_OUT_OF_RANGE;
5,781✔
1755
  code = colDataSetVal(pCol0, row, (const char *)&dnodeId, false);
5,781✔
1756
  if (code) return code;
5,781✔
1757

1758
  // column 1: thread_category (VARCHAR)
1759
  SColumnInfoData *pCol1 = taosArrayGet(pBlock->pDataBlock, 1);
5,781✔
1760
  if (pCol1 == NULL) return TSDB_CODE_OUT_OF_RANGE;
5,781✔
1761
  char catBuf[16 + VARSTR_HEADER_SIZE] = {0};
5,781✔
1762
  STR_TO_VARSTR(catBuf, category);
5,781✔
1763
  code = colDataSetVal(pCol1, row, catBuf, false);
5,781✔
1764
  if (code) return code;
5,781✔
1765

1766
  // column 2: cores (INT)
1767
  SColumnInfoData *pCol2 = taosArrayGet(pBlock->pDataBlock, 2);
5,781✔
1768
  if (pCol2 == NULL) return TSDB_CODE_OUT_OF_RANGE;
5,781✔
1769
  code = colDataSetVal(pCol2, row, (const char *)&cores, false);
5,781✔
1770
  if (code) return code;
5,781✔
1771

1772
  // column 3: core_ids (VARCHAR)
1773
  SColumnInfoData *pCol3 = taosArrayGet(pBlock->pDataBlock, 3);
5,781✔
1774
  if (pCol3 == NULL) return TSDB_CODE_OUT_OF_RANGE;
5,781✔
1775
  char idsBuf[256 + VARSTR_HEADER_SIZE] = {0};
5,781✔
1776
  STR_TO_VARSTR(idsBuf, coreIds);
5,781✔
1777
  code = colDataSetVal(pCol3, row, idsBuf, false);
5,781✔
1778
  if (code) return code;
5,781✔
1779

1780
  // column 4: enabled (BOOL)
1781
  SColumnInfoData *pCol4 = taosArrayGet(pBlock->pDataBlock, 4);
5,781✔
1782
  if (pCol4 == NULL) return TSDB_CODE_OUT_OF_RANGE;
5,781✔
1783
  int8_t boolVal = enabled ? 1 : 0;
5,781✔
1784
  code = colDataSetVal(pCol4, row, (const char *)&boolVal, false);
5,781✔
1785
  if (code) return code;
5,781✔
1786

1787
  pBlock->info.rows++;
5,781✔
1788
  return 0;
5,781✔
1789
}
1790

1791
static int32_t dmFillCpuAllocationBlock(SSDataBlock *pBlock, int32_t dnodeId) {
1,927✔
1792
  int32_t                code = 0;
1,927✔
1793
  const SCpuAllocStatus *status = taosGetCpuAllocStatus();
1,927✔
1794
  const char            *catNames[] = {"management", "write", "read"};
1,927✔
1795

1796
  code = blockDataEnsureCapacity(pBlock, THREAD_CAT_COUNT);
1,927✔
1797
  if (code) return code;
1,927✔
1798

1799
  for (int32_t c = 0; c < THREAD_CAT_COUNT; c++) {
7,708✔
1800
    char    coreIdsBuf[256] = {0};
5,781✔
1801
    int32_t cores = 0;
5,781✔
1802
    bool    enabled = false;
5,781✔
1803

1804
    if (status->enabled) {
5,781✔
1805
      cores = status->sets[c].count;
4,413✔
1806
      enabled = true;
4,413✔
1807
      int off = 0;
4,413✔
1808
      for (int32_t i = 0; i < status->sets[c].count && off < (int)sizeof(coreIdsBuf) - 8; i++) {
63,253✔
1809
        off +=
58,840✔
1810
            snprintf(coreIdsBuf + off, sizeof(coreIdsBuf) - off, "%s%d", i > 0 ? "," : "", status->sets[c].coreIds[i]);
58,840✔
1811
      }
1812
    } else {
1813
      tstrncpy(coreIdsBuf, "-", sizeof(coreIdsBuf));
1,368✔
1814
    }
1815

1816
    code = dmAppendCpuAllocationRow(pBlock, dnodeId, catNames[c], cores, coreIdsBuf, enabled);
5,781✔
1817
    if (code) return code;
5,781✔
1818
  }
1819
  return 0;
1,927✔
1820
}
1821

1822
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
33,370✔
1823
  int32_t           size = 0;
33,370✔
1824
  int32_t           rowsRead = 0;
33,370✔
1825
  int32_t           code = 0;
33,370✔
1826
  SRetrieveTableReq retrieveReq = {0};
33,370✔
1827
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
33,370✔
1828
    return TSDB_CODE_INVALID_MSG;
×
1829
  }
1830
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
33,370✔
1831
#if 0
1832
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
1833
    code = TSDB_CODE_MND_NO_RIGHTS;
1834
    return code;
1835
  }
1836
#endif
1837
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES) != 0 &&
33,370✔
1838
      strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_CPU_ALLOCATION) != 0) {
1,927✔
1839
    return TSDB_CODE_INVALID_MSG;
×
1840
  }
1841

1842
  SSDataBlock *pBlock = NULL;
33,370✔
1843

1844
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_CPU_ALLOCATION) == 0) {
33,370✔
1845
    if ((code = dmBuildCpuAllocationBlock(&pBlock)) != 0) {
1,927✔
1846
      return code;
×
1847
    }
1848
    code = dmFillCpuAllocationBlock(pBlock, pMgmt->pData->dnodeId);
1,927✔
1849
    if (code != 0) {
1,927✔
1850
      blockDataDestroy(pBlock);
×
1851
      return code;
×
1852
    }
1853
  } else {
1854
    if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
31,443✔
1855
      return code;
×
1856
    }
1857

1858
    code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
31,443✔
1859
    if (code != 0) {
31,443✔
1860
      blockDataDestroy(pBlock);
×
1861
      return code;
×
1862
    }
1863
  }
1864

1865
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
33,370✔
1866
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
33,370✔
1867
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
33,370✔
1868

1869
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
33,370✔
1870
  if (pRsp == NULL) {
33,370✔
1871
    code = terrno;
×
1872
    dError("failed to retrieve data since %s", tstrerror(code));
×
1873
    blockDataDestroy(pBlock);
×
1874
    return code;
×
1875
  }
1876

1877
  char *pStart = pRsp->data;
33,370✔
1878
  *(int32_t *)pStart = htonl(numOfCols);
33,370✔
1879
  pStart += sizeof(int32_t);  // number of columns
33,370✔
1880

1881
  for (int32_t i = 0; i < numOfCols; ++i) {
231,663✔
1882
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
198,293✔
1883
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
198,293✔
1884

1885
    pSchema->bytes = htonl(pColInfo->info.bytes);
198,293✔
1886
    pSchema->colId = htons(pColInfo->info.colId);
198,293✔
1887
    pSchema->type = pColInfo->info.type;
198,293✔
1888

1889
    pStart += sizeof(SSysTableSchema);
198,293✔
1890
  }
1891

1892
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
33,370✔
1893
  if (len < 0) {
33,370✔
1894
    dError("failed to retrieve data since %s", tstrerror(code));
×
1895
    blockDataDestroy(pBlock);
×
1896
    rpcFreeCont(pRsp);
×
1897
    return terrno;
×
1898
  }
1899

1900
  pRsp->numOfRows = htonl(pBlock->info.rows);
33,370✔
1901
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
33,370✔
1902
  pRsp->completed = 1;
33,370✔
1903
  pMsg->info.rsp = pRsp;
33,370✔
1904
  pMsg->info.rspLen = size;
33,370✔
1905
  dDebug("dnode variables retrieve completed");
33,370✔
1906

1907
  blockDataDestroy(pBlock);
33,370✔
1908
  return TSDB_CODE_SUCCESS;
33,370✔
1909
}
1910

1911
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
17,153,386✔
1912
  SMStreamHbRspMsg rsp = {0};
17,153,386✔
1913
  int32_t          code = 0;
17,153,386✔
1914
  SDecoder         decoder;
17,149,750✔
1915
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
17,153,386✔
1916
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
17,153,386✔
1917
  int64_t          currTs = taosGetTimestampMs();
17,153,386✔
1918

1919
  if (pMsg->code) {
17,153,386✔
1920
    return streamHbHandleRspErr(pMsg->code, currTs);
197,629✔
1921
  }
1922

1923
  tDecoderInit(&decoder, (uint8_t*)msg, len);
16,955,757✔
1924
  code = tDecodeStreamHbRsp(&decoder, &rsp);
16,955,757✔
1925
  if (code < 0) {
16,955,757✔
1926
    code = TSDB_CODE_INVALID_MSG;
×
1927
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
1928
    tDecoderClear(&decoder);
×
1929
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
1930
    return streamHbHandleRspErr(code, currTs);
×
1931
  }
1932

1933
  tDecoderClear(&decoder);
16,955,757✔
1934

1935
  return streamHbProcessRspMsg(&rsp);
16,955,757✔
1936
}
1937

1938

1939
SArray *dmGetMsgHandles() {
718,577✔
1940
  int32_t code = -1;
718,577✔
1941
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
718,577✔
1942
  if (pArray == NULL) {
718,577✔
1943
    return NULL;
×
1944
  }
1945

1946
  // Requests handled by DNODE
1947
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1948
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1949
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1950
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1951
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1952
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1953
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1954
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1955
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1956
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1957
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1958
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1959
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1960
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1961
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1962
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_KEY_EXPIRATION, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1963
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1964
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1965

1966
  // Requests handled by MNODE
1967
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1968
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1969
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
718,577✔
1970

1971
  code = 0;
718,577✔
1972

1973
_OVER:
718,577✔
1974
  if (code != 0) {
718,577✔
1975
    taosArrayDestroy(pArray);
×
1976
    return NULL;
×
1977
  } else {
1978
    return pArray;
718,577✔
1979
  }
1980
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc