• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5053

13 May 2026 12:00PM UTC coverage: 73.397% (+0.06%) from 73.338%
#5053

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

627 existing lines in 131 files now uncovered.

281694 of 383795 relevant lines covered (73.4%)

132505311.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.74
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "crypt.h"
21
#include "monitor.h"
22
#include "stream.h"
23
#include "systable.h"
24
#include "tanalytics.h"
25
#include "tchecksum.h"
26
#include "tencrypt.h"
27
#include "tutil.h"
28

29
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
30
#include "taoskInt.h"
31
#endif
32

33
extern SConfig *tsCfg;
34
extern void     setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId);
35

36
#ifndef TD_ENTERPRISE
37
void setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId) {}
38
#endif
39

40
extern void getAuditDbNameToken(char *pDb, char *pToken);
41

42
#ifndef TD_ENTERPRISE
43
void getAuditDbNameToken(char *pDb, char *pToken) {}
44
#endif
45

46
extern void getAuditEpSet(SEpSet *ep, int32_t *pVgId);
47

48
#ifndef TD_ENTERPRISE
49
void getAuditEpSet(SEpSet *ep, int32_t *pVgId) {}
50
#endif
51

52
SMonVloadInfo tsVinfo = {0};
53
SMnodeLoad    tsMLoad = {0};
54
SDnodeData    tsDnodeData = {0};
55

56
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
2,287,078✔
57
  int32_t code = 0;
2,287,078✔
58
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
2,287,078✔
59
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
561,509✔
60
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
561,509✔
61
    pMgmt->pData->dnodeId = pCfg->dnodeId;
561,509✔
62
    pMgmt->pData->clusterId = pCfg->clusterId;
561,509✔
63
    monSetDnodeId(pCfg->dnodeId);
561,509✔
64
    auditSetDnodeId(pCfg->dnodeId);
561,509✔
65
    code = dmWriteEps(pMgmt->pData);
561,509✔
66
    if (code != 0) {
561,509✔
67
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
3,610✔
68
            tstrerror(code));
69
    }
70
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
561,509✔
71
  }
72
}
2,287,078✔
73

74
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,494,472✔
75
  int32_t code = 0;
2,494,472✔
76
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,494,472✔
77
  if (pMgmt->pData->ipWhiteVer == ver) {
2,494,472✔
78
    if (ver == 0) {
2,493,857✔
79
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,492,957✔
80
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,492,957✔
81
        dError("failed to disable ip white list on dnode");
×
82
      }
83
    }
84
    return;
2,493,857✔
85
  }
86
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
615✔
87

88
  SRetrieveWhiteListReq req = {.ver = oldVer};
615✔
89
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
615✔
90
  if (contLen < 0) {
615✔
91
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
92
    return;
×
93
  }
94
  void *pHead = rpcMallocCont(contLen);
615✔
95
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
615✔
96
  if (contLen < 0) {
615✔
97
    rpcFreeCont(pHead);
×
98
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
99
    return;
×
100
  }
101

102
  SRpcMsg rpcMsg = {.pCont = pHead,
615✔
103
                    .contLen = contLen,
104
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
105
                    .info.ahandle = 0,
106
                    .info.notFreeAhandle = 1,
107
                    .info.refId = 0,
108
                    .info.noResp = 0,
109
                    .info.handle = 0};
110
  SEpSet  epset = {0};
615✔
111

112
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
615✔
113

114
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
615✔
115
  if (code != 0) {
615✔
116
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
117
  }
118
}
119

120

121

122
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,494,472✔
123
  int32_t code = 0;
2,494,472✔
124
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,494,472✔
125
  if (pMgmt->pData->timeWhiteVer == ver) {
2,494,472✔
126
    if (ver == 0) {
2,493,857✔
127
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,492,957✔
128
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,492,957✔
129
        dError("failed to disable time white list on dnode");
×
130
      }
131
    }
132
    return;
2,493,857✔
133
  }
134
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
615✔
135

136
  SRetrieveWhiteListReq req = {.ver = oldVer};
615✔
137
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
615✔
138
  if (contLen < 0) {
615✔
139
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
140
    return;
×
141
  }
142
  void *pHead = rpcMallocCont(contLen);
615✔
143
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
615✔
144
  if (contLen < 0) {
615✔
145
    rpcFreeCont(pHead);
×
146
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
147
    return;
×
148
  }
149

150
  SRpcMsg rpcMsg = {.pCont = pHead,
615✔
151
                    .contLen = contLen,
152
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
153
                    .info.ahandle = 0,
154
                    .info.notFreeAhandle = 1,
155
                    .info.refId = 0,
156
                    .info.noResp = 0,
157
                    .info.handle = 0};
158
  SEpSet  epset = {0};
615✔
159

160
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
615✔
161

162
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
615✔
163
  if (code != 0) {
615✔
164
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
165
  }
166
}
167

168

169

170
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,494,472✔
171
  int32_t code = 0;
2,494,472✔
172
  int64_t oldVer = taosAnalyGetVersion();
2,494,472✔
173
  if (oldVer == newVer) return;
2,494,472✔
174
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
175

176
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
177
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
178
  if (contLen < 0) {
×
179
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
180
    return;
×
181
  }
182

183
  void *pHead = rpcMallocCont(contLen);
×
184
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
185
  if (contLen < 0) {
×
186
    rpcFreeCont(pHead);
×
187
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
188
    return;
×
189
  }
190

191
  SRpcMsg rpcMsg = {
×
192
      .pCont = pHead,
193
      .contLen = contLen,
194
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
195
      .info.ahandle = 0,
196
      .info.refId = 0,
197
      .info.noResp = 0,
198
      .info.handle = 0,
199
  };
200
  SEpSet epset = {0};
×
201

202
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
203

204
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
205
  if (code != 0) {
×
206
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
207
  }
208
}
209

210
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
57,075,795✔
211
  const STraceId *trace = &pRsp->info.traceId;
57,075,795✔
212
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
57,075,795✔
213

214
  if (pRsp->code != 0) {
57,075,795✔
215
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
501,802✔
216
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
9,007✔
217
             pMgmt->statusSeq);
218
      pMgmt->pData->dropped = 1;
9,007✔
219
      if (dmWriteEps(pMgmt->pData) != 0) {
9,007✔
220
        dError("failed to write dnode file");
×
221
      }
222
      dInfo("dnode will exit since it is in the dropped state");
9,007✔
223
      (void)raise(SIGINT);
9,007✔
224
    }
225
  } else {
226
    SStatusRsp statusRsp = {0};
56,573,993✔
227
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
59,068,465✔
228
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,494,472✔
229
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,494,472✔
230
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
2,287,078✔
231
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
232
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
2,287,078✔
233
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
2,287,078✔
234
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
2,287,078✔
235
      }
236
      setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken, &(statusRsp.auditEpSet), statusRsp.auditVgId);
2,494,472✔
237
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,494,472✔
238
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,494,472✔
239
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,494,472✔
240
    }
241
    tFreeSStatusRsp(&statusRsp);
56,573,993✔
242
  }
243
  rpcFreeCont(pRsp->pCont);
57,075,795✔
244
}
57,075,795✔
245

246
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
57,182,546✔
247
  int32_t    code = 0;
57,182,546✔
248
  SStatusReq req = {0};
57,182,546✔
249
  req.timestamp = taosGetTimestampMs();
57,182,546✔
250
  pMgmt->statusSeq++;
57,182,546✔
251

252
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
57,182,546✔
253
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
57,182,546✔
254
    dError("failed to lock status info lock");
×
UNCOV
255
    return;
×
256
  }
257

258
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
57,182,546✔
259
  req.sver = tsVersion;
57,182,546✔
260
  req.dnodeVer = tsDnodeData.dnodeVer;
57,182,546✔
261
  req.dnodeId = tsDnodeData.dnodeId;
57,182,546✔
262
  req.clusterId = tsDnodeData.clusterId;
57,182,546✔
263
  if (req.clusterId == 0) req.dnodeId = 0;
57,182,546✔
264
  req.rebootTime = tsDnodeData.rebootTime;
57,182,546✔
265
  req.updateTime = tsDnodeData.updateTime;
57,182,546✔
266
  req.numOfCores = tsNumOfCores;
57,182,546✔
267
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
57,182,546✔
268
  req.numOfDiskCfg = tsDiskCfgNum;
57,182,546✔
269
  req.memTotal = tsTotalMemoryKB * 1024;
57,182,546✔
270
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
57,182,546✔
271
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
57,182,546✔
272
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
57,182,546✔
273

274
  req.clusterCfg.statusInterval = tsStatusInterval;
57,182,546✔
275
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
57,182,546✔
276
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
57,182,546✔
277
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
57,182,546✔
278
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
57,182,546✔
279
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
57,182,546✔
280
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
57,182,546✔
281
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
57,182,546✔
282
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
57,182,546✔
283
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
57,182,546✔
284
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
57,182,546✔
285
  req.clusterCfg.checkTime = (int64_t)taosGetLocalTimezoneOffset(&code);
57,182,546✔
286
  if (code != 0) {
57,182,546✔
287
    dError("failed to get local timezone offset, since %s", tstrerror(code));
×
288
    (void)taosThreadMutexUnlock(&pMgmt->pData->statusInfolock);
×
289
    return;
×
290
  }
291

292
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
57,182,546✔
293
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
57,182,546✔
294
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
57,182,546✔
295
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
57,182,546✔
296

297
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
57,182,546✔
298

299
  req.pVloads = tsVinfo.pVloads;
57,182,546✔
300
  tsVinfo.pVloads = NULL;
57,182,546✔
301

302
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
57,182,546✔
303
  req.mload = tsMLoad;
57,182,546✔
304

305
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
57,182,546✔
306
    dError("failed to unlock status info lock");
×
307
    return;
×
308
  }
309

310
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
57,182,546✔
311
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
57,182,546✔
312

313
  req.statusSeq = pMgmt->statusSeq;
57,182,546✔
314
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
57,182,546✔
315
  req.analVer = taosAnalyGetVersion();
57,182,546✔
316
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
57,182,546✔
317

318
  if (tsAuditUseToken) {
57,182,546✔
319
    getAuditDbNameToken(req.auditDB, req.auditToken);
57,177,132✔
320
  }
321

322
  if (tsAuditSaveInSelf) {
57,182,546✔
323
    getAuditEpSet(&req.auditEpSet, &req.auditVgId);
3,182✔
324
  }
325

326
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
57,182,546✔
327
  if (contLen < 0) {
57,182,546✔
328
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
329
    return;
×
330
  }
331

332
  void *pHead = rpcMallocCont(contLen);
57,182,546✔
333
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
57,182,546✔
334
  if (contLen < 0) {
57,182,546✔
335
    rpcFreeCont(pHead);
×
336
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
337
    return;
×
338
  }
339
  tFreeSStatusReq(&req);
57,182,546✔
340

341
  SRpcMsg rpcMsg = {.pCont = pHead,
57,182,546✔
342
                    .contLen = contLen,
343
                    .msgType = TDMT_MND_STATUS,
344
                    .info.ahandle = 0,
345
                    .info.notFreeAhandle = 1,
346
                    .info.refId = 0,
347
                    .info.noResp = 0,
348
                    .info.handle = 0};
349
  SRpcMsg rpcRsp = {0};
57,182,546✔
350

351
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
57,182,546✔
352

353
  SEpSet epSet = {0};
57,182,546✔
354
  int8_t epUpdated = 0;
57,182,546✔
355
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
57,182,546✔
356

357
  if (dDebugFlag & DEBUG_TRACE) {
57,182,546✔
358
    char tbuf[512];
2,134,713✔
359
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
2,137,389✔
360
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
2,137,389✔
361
  }
362
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
57,182,546✔
363
  if (code != 0) {
57,182,546✔
364
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
106,751✔
365
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
106,751✔
366
      dmRotateMnodeEpSet(pMgmt->pData);
106,751✔
367
      char tbuf[512];
106,751✔
368
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
106,751✔
369
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
106,751✔
370
            tbuf, epSet.inUse);
371
    }
372
    return;
106,751✔
373
  }
374

375
  if (rpcRsp.code != 0) {
57,075,795✔
376
    dmRotateMnodeEpSet(pMgmt->pData);
501,802✔
377
    char tbuf[512];
501,802✔
378
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
501,802✔
379
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
501,802✔
380
          tbuf, epSet.inUse);
381
  } else {
382
    if (epUpdated == 1) {
56,573,993✔
383
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
119,656✔
384
    }
385
  }
386
  dmProcessStatusRsp(pMgmt, &rpcRsp);
57,075,795✔
387
}
388

389
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
723,869✔
390
  const STraceId *trace = &pRsp->info.traceId;
723,869✔
391
  int32_t         code = 0;
723,869✔
392
  SConfigRsp      configRsp = {0};
723,869✔
393
  bool            needStop = false;
723,869✔
394

395
  if (pRsp->code != 0) {
723,869✔
396
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
397
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
398
      pMgmt->pData->dropped = 1;
×
399
      if (dmWriteEps(pMgmt->pData) != 0) {
×
400
        dError("failed to write dnode file");
×
401
      }
402
      dInfo("dnode will exit since it is in the dropped state");
×
403
      (void)raise(SIGINT);
×
404
    }
405
  } else {
406
    bool needUpdate = false;
723,869✔
407
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,447,738✔
408
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
723,869✔
409
      // Try to use cfg from mnode sdb.
410
      if (!configRsp.isVersionVerified) {
723,869✔
411
        uInfo("config version not verified, update config");
562,475✔
412
        needUpdate = true;
562,475✔
413
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
562,475✔
414
        if (code != TSDB_CODE_SUCCESS) {
562,475✔
415
          dError("failed to persist global config since %s", tstrerror(code));
992✔
416
          goto _exit;
992✔
417
        }
418
      }
419
    }
420
    if (needUpdate) {
722,877✔
421
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
561,483✔
422
      if (code != TSDB_CODE_SUCCESS) {
561,483✔
423
        dError("failed to update config since %s", tstrerror(code));
×
424
        goto _exit;
×
425
      }
426
      code = setAllConfigs(tsCfg);
561,483✔
427
      if (code != TSDB_CODE_SUCCESS) {
561,483✔
428
        dError("failed to set all configs since %s", tstrerror(code));
1,584✔
429
        goto _exit;
1,584✔
430
      }
431
    }
432
    code = taosPersistLocalConfig(pMgmt->path);
721,293✔
433
    if (code != TSDB_CODE_SUCCESS) {
721,293✔
434
      dError("failed to persist local config since %s", tstrerror(code));
×
435
    }
436
    tsConfigInited = 1;
721,293✔
437
  }
438
_exit:
723,869✔
439
  tFreeSConfigRsp(&configRsp);
723,869✔
440
  rpcFreeCont(pRsp->pCont);
723,869✔
441
  if (needStop) {
723,869✔
442
    dmStop();
×
443
  }
444
}
723,869✔
445

446
int32_t dmProcessKeySyncRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
694,661✔
447
  const STraceId *trace = &pRsp->info.traceId;
694,661✔
448
  int32_t         code = 0;
694,661✔
449
  SKeySyncRsp     keySyncRsp = {0};
694,661✔
450

451
  if (pRsp->code != 0) {
694,661✔
452
    dError("failed to sync keys from mnode since %s", tstrerror(pRsp->code));
×
453
    code = pRsp->code;
×
454
    goto _exit;
×
455
  }
456

457
  if (pRsp->pCont == NULL || pRsp->contLen <= 0) {
694,661✔
458
    dError("invalid key sync response, empty content");
×
459
    code = TSDB_CODE_INVALID_MSG;
×
460
    goto _exit;
×
461
  }
462

463
  code = tDeserializeSKeySyncRsp(pRsp->pCont, pRsp->contLen, &keySyncRsp);
694,661✔
464
  if (code != 0) {
694,661✔
465
    dError("failed to deserialize key sync response since %s", tstrerror(code));
×
466
    goto _exit;
×
467
  }
468

469
  dInfo("received key sync response, mnode keyVersion:%d, local keyVersion:%d, needUpdate:%d", keySyncRsp.keyVersion,
694,661✔
470
        tsLocalKeyVersion, keySyncRsp.needUpdate);
471
  tsEncryptKeysStatus = keySyncRsp.encryptionKeyStatus;
694,661✔
472
  if (keySyncRsp.needUpdate) {
694,661✔
473
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
474
    // Get encrypt file path from tsDataDir
475
    char masterKeyFile[PATH_MAX] = {0};
1,706✔
476
    char derivedKeyFile[PATH_MAX] = {0};
1,706✔
477
    snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,706✔
478
             TD_DIRSEP);
479
    snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,706✔
480
             TD_DIRSEP);
481

482
    dInfo("updating local encryption keys from mnode, key file is saved in %s and %s, keyVersion:%d -> %d",
1,706✔
483
          masterKeyFile, derivedKeyFile, tsLocalKeyVersion, keySyncRsp.keyVersion);
484

485
    // Save keys to master.bin and derived.bin
486
    // Use the same algorithm for cfg and meta keys (backward compatible)
487
    code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, keySyncRsp.svrKey, keySyncRsp.dbKey, keySyncRsp.cfgKey, keySyncRsp.metaKey,
1,706✔
488
                                keySyncRsp.dataKey, keySyncRsp.algorithm, keySyncRsp.algorithm, keySyncRsp.algorithm,
489
                                keySyncRsp.keyVersion, keySyncRsp.createTime,
490
                                keySyncRsp.svrKeyUpdateTime, keySyncRsp.dbKeyUpdateTime);
491
    if (code != 0) {
1,706✔
492
      dError("failed to save encryption keys since %s", tstrerror(code));
×
493
      goto _exit;
×
494
    }
495

496
    // Update global variables with synced keys
497
    tstrncpy(tsSvrKey, keySyncRsp.svrKey, sizeof(tsSvrKey));
1,706✔
498
    tstrncpy(tsDbKey, keySyncRsp.dbKey, sizeof(tsDbKey));
1,706✔
499
    tstrncpy(tsCfgKey, keySyncRsp.cfgKey, sizeof(tsCfgKey));
1,706✔
500
    tstrncpy(tsMetaKey, keySyncRsp.metaKey, sizeof(tsMetaKey));
1,706✔
501
    tstrncpy(tsDataKey, keySyncRsp.dataKey, sizeof(tsDataKey));
1,706✔
502
    tsEncryptAlgorithmType = keySyncRsp.algorithm;
1,706✔
503
    tsEncryptKeyVersion = keySyncRsp.keyVersion;
1,706✔
504
    tsEncryptKeyCreateTime = keySyncRsp.createTime;
1,706✔
505
    tsSvrKeyUpdateTime = keySyncRsp.svrKeyUpdateTime;
1,706✔
506
    tsDbKeyUpdateTime = keySyncRsp.dbKeyUpdateTime;
1,706✔
507

508
    // Update local key version
509
    tsLocalKeyVersion = keySyncRsp.keyVersion;
1,706✔
510
    dInfo("successfully updated local encryption keys to version:%d", tsLocalKeyVersion);
1,706✔
511

512
    // Encrypt existing plaintext config files
513
    code = taosEncryptExistingCfgFiles(tsDataDir);
1,706✔
514
    if (code != 0) {
1,706✔
515
      dWarn("failed to encrypt existing config files since %s, will retry on next write", tstrerror(code));
×
516
      // Don't fail the key sync, files will be encrypted on next write
517
      code = 0;
×
518
    }
519
#else
520
    dWarn("enterprise features not enabled, skipping key sync");
521
#endif
522
  } else {
523
    dDebug("local keys are up to date, version:%d", tsLocalKeyVersion);
692,955✔
524
  }
525
  
526
  code = TSDB_CODE_SUCCESS;
694,661✔
527

528
_exit:
694,661✔
529
  rpcFreeCont(pRsp->pCont);
694,661✔
530
  return code;
694,661✔
531
}
532

533
void dmSendKeySyncReq(SDnodeMgmt *pMgmt) {
701,911✔
534
  int32_t     code = 0;
701,911✔
535
  SKeySyncReq req = {0};
701,911✔
536

537
  req.dnodeId = pMgmt->pData->dnodeId;
701,911✔
538
  req.keyVersion = tsLocalKeyVersion;
701,911✔
539
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d", req.dnodeId, req.keyVersion);
701,911✔
540

541
  int32_t contLen = tSerializeSKeySyncReq(NULL, 0, &req);
701,911✔
542
  if (contLen < 0) {
701,911✔
543
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
UNCOV
544
    return;
×
545
  }
546

547
  void *pHead = rpcMallocCont(contLen);
701,911✔
548
  if (pHead == NULL) {
701,911✔
549
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
550
    return;
×
551
  }
552
  contLen = tSerializeSKeySyncReq(pHead, contLen, &req);
701,911✔
553
  if (contLen < 0) {
701,911✔
554
    rpcFreeCont(pHead);
×
555
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
556
    return;
×
557
  }
558

559
  SRpcMsg rpcMsg = {.pCont = pHead,
701,911✔
560
                    .contLen = contLen,
561
                    .msgType = TDMT_MND_KEY_SYNC,
562
                    .info.ahandle = 0,
563
                    .info.notFreeAhandle = 1,
564
                    .info.refId = 0,
565
                    .info.noResp = 0,
566
                    .info.handle = 0};
567
  SRpcMsg rpcRsp = {0};
701,911✔
568

569
  SEpSet epSet = {0};
701,911✔
570
  int8_t epUpdated = 0;
701,911✔
571
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
701,911✔
572

573
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d, begin to send rpc msg", req.dnodeId, req.keyVersion);
701,911✔
574
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
701,911✔
575
  if (code != 0) {
701,911✔
576
    dError("failed to SendRecv key sync req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
7,250✔
577
    return;
7,250✔
578
  }
579
  if (rpcRsp.code != 0) {
694,661✔
580
    dError("failed to send key sync req since %s", tstrerror(rpcRsp.code));
×
581
    return;
×
582
  }
583
  code = dmProcessKeySyncRsp(pMgmt, &rpcRsp);
694,661✔
584
  if (code != 0) {
694,661✔
585
    dError("failed to process key sync rsp since %s", tstrerror(code));
×
586
    return;
×
587
  }
588
}
589

590
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
730,362✔
591
  int32_t    code = 0;
730,362✔
592
  SConfigReq req = {0};
730,362✔
593

594
  req.cver = tsdmConfigVersion;
730,362✔
595
  req.forceReadConfig = true;
730,362✔
596
  req.array = taosGetGlobalCfg(tsCfg);
730,362✔
597
  dDebug("send config req to mnode, configVersion:%d", req.cver);
730,362✔
598

599
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
730,362✔
600
  if (contLen < 0) {
730,362✔
601
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
UNCOV
602
    return;
×
603
  }
604

605
  void *pHead = rpcMallocCont(contLen);
730,362✔
606
  if (pHead == NULL) {
730,362✔
607
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
608
    return;
×
609
  }
610
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
730,362✔
611
  if (contLen < 0) {
730,362✔
612
    rpcFreeCont(pHead);
×
613
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
614
    return;
×
615
  }
616

617
  SRpcMsg rpcMsg = {.pCont = pHead,
730,362✔
618
                    .contLen = contLen,
619
                    .msgType = TDMT_MND_CONFIG,
620
                    .info.ahandle = 0,
621
                    .info.notFreeAhandle = 1,
622
                    .info.refId = 0,
623
                    .info.noResp = 0,
624
                    .info.handle = 0};
625
  SRpcMsg rpcRsp = {0};
730,362✔
626

627
  SEpSet epSet = {0};
730,362✔
628
  int8_t epUpdated = 0;
730,362✔
629
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
730,362✔
630

631
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
730,362✔
632
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
730,362✔
633
  if (code != 0) {
730,362✔
634
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
6,493✔
635
    return;
6,493✔
636
  }
637
  if (rpcRsp.code != 0) {
723,869✔
638
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
639
    return;
×
640
  }
641
  dmProcessConfigRsp(pMgmt, &rpcRsp);
723,869✔
642
}
643

644
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
57,851,404✔
645
  dDebug("begin to get dnode info");
57,851,404✔
646
  SDnodeData dnodeData = {0};
57,851,404✔
647
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
57,851,404✔
648
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
57,851,404✔
649
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
57,851,404✔
650
  dnodeData.clusterId = pMgmt->pData->clusterId;
57,851,404✔
651
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
57,851,404✔
652
  dnodeData.updateTime = pMgmt->pData->updateTime;
57,851,404✔
653
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
57,851,404✔
654
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
57,851,404✔
655

656
  dDebug("begin to get vnode loads");
57,851,404✔
657
  SMonVloadInfo vinfo = {0};
57,851,404✔
658
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
57,851,404✔
659

660
  dDebug("begin to get mnode loads");
57,851,404✔
661
  SMonMloadInfo minfo = {0};
57,851,404✔
662
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
57,851,404✔
663

664
  dDebug("begin to lock status info");
57,851,404✔
665
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
57,851,404✔
666
    dError("failed to lock status info lock");
×
667
    return;
×
668
  }
669
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
57,851,404✔
670
  tsDnodeData.dnodeId = dnodeData.dnodeId;
57,851,404✔
671
  tsDnodeData.clusterId = dnodeData.clusterId;
57,851,404✔
672
  tsDnodeData.rebootTime = dnodeData.rebootTime;
57,851,404✔
673
  tsDnodeData.updateTime = dnodeData.updateTime;
57,851,404✔
674
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
57,851,404✔
675

676
  if (tsVinfo.pVloads == NULL) {
57,851,404✔
677
    tsVinfo.pVloads = vinfo.pVloads;
56,337,387✔
678
    vinfo.pVloads = NULL;
56,337,387✔
679
  } else {
680
    taosArrayDestroy(vinfo.pVloads);
1,514,017✔
681
    vinfo.pVloads = NULL;
1,514,017✔
682
  }
683

684
  tsMLoad = minfo.load;
57,851,404✔
685

686
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
57,851,404✔
687
    dError("failed to unlock status info lock");
×
688
    return;
×
689
  }
690
}
691

692
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
693
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
694
  if (contLen < 0) {
×
695
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
696
    return;
×
697
  }
698
  void *pHead = rpcMallocCont(contLen);
×
699
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
700
  if (contLen < 0) {
×
701
    rpcFreeCont(pHead);
×
702
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
703
    return;
×
704
  }
705

706
  SRpcMsg rpcMsg = {.pCont = pHead,
×
707
                    .contLen = contLen,
708
                    .msgType = TDMT_MND_NOTIFY,
709
                    .info.ahandle = 0,
710
                    .info.notFreeAhandle = 1,
711
                    .info.refId = 0,
712
                    .info.noResp = 1,
713
                    .info.handle = 0};
714

715
  SEpSet epSet = {0};
×
716
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
717
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
718
    dError("failed to send notify req");
×
719
  }
720
}
721

722
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
723
  dError("auth rsp is received, but not supported yet");
×
724
  return 0;
×
725
}
726

727
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
728
  dError("grant rsp is received, but not supported yet");
×
729
  return 0;
×
730
}
731

732
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
103,667✔
733
  int32_t       code = 0;
103,667✔
734
  SDCfgDnodeReq cfgReq = {0};
103,667✔
735
  SConfig      *pCfg = taosGetCfg();
103,667✔
736
  SConfigItem  *pItem = NULL;
103,667✔
737

738
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
103,667✔
739
    return TSDB_CODE_INVALID_MSG;
×
740
  }
741
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
103,667✔
742
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
3,820✔
743
  }
744

745
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
99,847✔
746

747
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
99,847✔
748
  if (code != 0) {
99,847✔
749
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
271✔
750
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
271✔
751
      return TSDB_CODE_SUCCESS;
271✔
752
    } else {
753
      return code;
×
754
    }
755
  }
756
  if (pItem == NULL) {
99,576✔
757
    return TSDB_CODE_CFG_NOT_FOUND;
×
758
  }
759

760
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
99,576✔
761
    char value[10] = {0};
×
762
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
763
      tsSyncTimeout = 0;
×
764
    }
765

766
    if (tsSyncTimeout > 0) {
×
767
      SConfigItem *pItemTmp = NULL;
×
768
      char         tmp[10] = {0};
×
769

770
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout);
×
771
      TAOS_CHECK_RETURN(
×
772
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
773
      if (pItemTmp == NULL) {
×
774
        return TSDB_CODE_CFG_NOT_FOUND;
×
775
      }
776

777
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout / 4);
×
778
      TAOS_CHECK_RETURN(
×
779
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
780
      if (pItemTmp == NULL) {
×
781
        return TSDB_CODE_CFG_NOT_FOUND;
×
782
      }
783
      TAOS_CHECK_RETURN(
×
784
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
785
      if (pItemTmp == NULL) {
×
786
        return TSDB_CODE_CFG_NOT_FOUND;
×
787
      }
788

789
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
790
      TAOS_CHECK_RETURN(
×
791
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
792
      if (pItemTmp == NULL) {
×
793
        return TSDB_CODE_CFG_NOT_FOUND;
×
794
      }
795
      TAOS_CHECK_RETURN(
×
796
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
797
      if (pItemTmp == NULL) {
×
798
        return TSDB_CODE_CFG_NOT_FOUND;
×
799
      }
800
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
801
      if (pItemTmp == NULL) {
×
802
        return TSDB_CODE_CFG_NOT_FOUND;
×
803
      }
804

805
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
806
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
807
      if (pItemTmp == NULL) {
×
808
        return TSDB_CODE_CFG_NOT_FOUND;
×
809
      }
810

811
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
812
      TAOS_CHECK_RETURN(
×
813
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
814
      if (pItemTmp == NULL) {
×
815
        return TSDB_CODE_CFG_NOT_FOUND;
×
816
      }
817
      TAOS_CHECK_RETURN(
×
818
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
819
      if (pItemTmp == NULL) {
×
820
        return TSDB_CODE_CFG_NOT_FOUND;
×
821
      }
822
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
823
      if (pItemTmp == NULL) {
×
824
        return TSDB_CODE_CFG_NOT_FOUND;
×
825
      }
826

827
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
828
            tsSyncTimeout);
829
    }
830
  }
831

832
  if (!isConifgItemLazyMode(pItem)) {
99,576✔
833
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
98,851✔
834

835
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
98,851✔
836
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
837
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
838
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
839

840
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
841
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
842
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
843
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
844

845
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
846
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
847
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
848

849
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
850
            tsSyncTimeout);
851
    }
852
  }
853

854
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
99,576✔
855
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
17,630✔
856
    if (code != TSDB_CODE_SUCCESS) {
17,630✔
857
      dError("failed to persist global config since %s", tstrerror(code));
×
858
    }
859
  } else {
860
    code = taosPersistLocalConfig(pMgmt->path);
81,946✔
861
    if (code != TSDB_CODE_SUCCESS) {
81,946✔
862
      dError("failed to persist local config since %s", tstrerror(code));
×
863
    }
864
  }
865

866
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
99,576✔
867
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
868

869
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
870
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
871
  }
872

873
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
99,576✔
874
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
99,576✔
875
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
876
  }
877

878
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
99,576✔
879
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
99,576✔
880
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
881
  }
882

883
  if (cfgReq.version > 0) {
99,576✔
884
    tsdmConfigVersion = cfgReq.version;
26,318✔
885
  }
886
  return code;
99,576✔
887
}
888

889
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
214✔
890
#ifdef TD_ENTERPRISE
891
  int32_t       code = 0;
214✔
892
  SDCfgDnodeReq cfgReq = {0};
214✔
893
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
214✔
894
    code = TSDB_CODE_INVALID_MSG;
×
895
    goto _exit;
×
896
  }
897

898
  code = dmUpdateEncryptKey(cfgReq.value, true);
214✔
899
  if (code == 0) {
214✔
900
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
214✔
901
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
214✔
902
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
214✔
903
  }
904

905
_exit:
214✔
906
  pMsg->code = code;
214✔
907
  pMsg->info.rsp = NULL;
214✔
908
  pMsg->info.rspLen = 0;
214✔
909
  return code;
214✔
910
#else
911
  return 0;
912
#endif
913
}
914

915
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
916
// Verification plaintext used to validate encryption keys
917
#define KEY_VERIFY_PLAINTEXT "TDengine_Encryption_Key_Verification_v1.0"
918

919
// Save key verification file with encrypted plaintext for each key
920
static int32_t dmSaveKeyVerification(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
2,738✔
921
                                     const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
922
                                     int32_t metaAlgorithm) {
923
  char    verifyFile[PATH_MAX] = {0};
2,738✔
924
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
2,738✔
925
                            TD_DIRSEP, TD_DIRSEP);
926
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
2,738✔
927
    dError("failed to build key verification file path");
×
928
    return TSDB_CODE_OUT_OF_BUFFER;
×
929
  }
930

931
  int32_t     code = 0;
2,738✔
932
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
2,738✔
933
  int32_t     plaintextLen = strlen(plaintext);
2,738✔
934

935
  // Array of keys and their algorithms
936
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
2,738✔
937
  int32_t     algorithms[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
2,738✔
938
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
2,738✔
939

940
  // Calculate total buffer size
941
  int32_t encryptedLen = ((plaintextLen + 15) / 16) * 16;                 // Padded length for CBC
2,738✔
942
  int32_t headerSize = sizeof(uint32_t) + sizeof(uint16_t);               // magic + version
2,738✔
943
  int32_t perKeySize = sizeof(int32_t) + sizeof(int32_t) + encryptedLen;  // algo + len + encrypted data
2,738✔
944
  int32_t totalSize = headerSize + perKeySize * 5;
2,738✔
945

946
  // Allocate buffer for all data
947
  char *buffer = taosMemoryMalloc(totalSize);
2,738✔
948
  if (buffer == NULL) {
2,738✔
949
    dError("failed to allocate memory for key verification buffer");
×
950
    return terrno;
×
951
  }
952

953
  char *ptr = buffer;
2,738✔
954

955
  // Write magic number and version to buffer
956
  uint32_t magic = 0x544B5659;  // "TKVY" in hex
2,738✔
957
  uint16_t version = 1;
2,738✔
958
  memcpy(ptr, &magic, sizeof(magic));
2,738✔
959
  ptr += sizeof(magic);
2,738✔
960
  memcpy(ptr, &version, sizeof(version));
2,738✔
961
  ptr += sizeof(version);
2,738✔
962

963
  // Encrypt all keys and write to buffer
964
  char paddedPlaintext[512] = {0};
2,738✔
965
  memcpy(paddedPlaintext, plaintext, plaintextLen);
2,738✔
966

967
  for (int i = 0; i < 5; i++) {
16,428✔
968
    char encrypted[512] = {0};
13,690✔
969

970
    // Encrypt the verification plaintext with this key using CBC
971
    SCryptOpts opts = {0};
13,690✔
972
    opts.len = encryptedLen;
13,690✔
973
    opts.source = paddedPlaintext;
13,690✔
974
    opts.result = encrypted;
13,690✔
975
    opts.unitLen = 16;
13,690✔
976
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
13,690✔
977
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
13,690✔
978

979
    int32_t count = CBC_Encrypt(&opts);
13,690✔
980
    if (count != opts.len) {
13,690✔
981
      code = terrno ? terrno : TSDB_CODE_FAILED;
×
982
      dError("failed to encrypt verification for %s, count=%d, expected=%d, since %s", keyNames[i], count, opts.len,
×
983
             tstrerror(code));
984
      taosMemoryFree(buffer);
×
985
      return code;
×
986
    }
987

988
    // Write to buffer: algorithm + encrypted length + encrypted data
989
    memcpy(ptr, &algorithms[i], sizeof(int32_t));
13,690✔
990
    ptr += sizeof(int32_t);
13,690✔
991
    memcpy(ptr, &encryptedLen, sizeof(int32_t));
13,690✔
992
    ptr += sizeof(int32_t);
13,690✔
993
    memcpy(ptr, encrypted, encryptedLen);
13,690✔
994
    ptr += encryptedLen;
13,690✔
995

996
    dDebug("prepared verification for %s: algorithm=%d, encLen=%d", keyNames[i], algorithms[i], encryptedLen);
13,690✔
997
  }
998

999
  // Write all data to file in one operation
1000
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
2,738✔
1001
  if (pFile == NULL) {
2,738✔
1002
    dError("failed to create key verification file:%s, errno:%d", verifyFile, errno);
×
1003
    taosMemoryFree(buffer);
×
1004
    return TSDB_CODE_FILE_CORRUPTED;
×
1005
  }
1006

1007
  int64_t written = taosWriteFile(pFile, buffer, totalSize);
2,738✔
1008
  (void)taosCloseFile(&pFile);
2,738✔
1009
  taosMemoryFree(buffer);
2,738✔
1010

1011
  if (written != totalSize) {
2,738✔
1012
    dError("failed to write key verification file, written=%" PRId64 ", expected=%d", written, totalSize);
×
1013
    return TSDB_CODE_FILE_CORRUPTED;
×
1014
  }
1015

1016
  dInfo("successfully saved key verification file:%s, size=%d", verifyFile, totalSize);
2,738✔
1017
  return 0;
2,738✔
1018
}
1019

1020
// Verify all encryption keys by decrypting and comparing with original plaintext
1021
static int32_t dmVerifyEncryptionKeys(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
2,390✔
1022
                                      const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
1023
                                      int32_t metaAlgorithm) {
1024
  char    verifyFile[PATH_MAX] = {0};
2,390✔
1025
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
2,390✔
1026
                            TD_DIRSEP, TD_DIRSEP);
1027
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
2,390✔
1028
    dError("failed to build key verification file path");
×
1029
    return TSDB_CODE_OUT_OF_BUFFER;
×
1030
  }
1031

1032
  // Get file size
1033
  int64_t fileSize = 0;
2,390✔
1034
  if (taosStatFile(verifyFile, &fileSize, NULL, NULL) < 0) {
2,390✔
1035
    // File doesn't exist, create it with current keys
1036
    dInfo("key verification file not found, creating new one");
2,390✔
1037
    return dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
2,390✔
1038
  }
1039

1040
  if (fileSize <= 0 || fileSize > 10240) {  // Max 10KB
×
1041
    dError("invalid key verification file size: %" PRId64, fileSize);
×
1042
    return TSDB_CODE_FILE_CORRUPTED;
×
1043
  }
1044

1045
  // Allocate buffer and read entire file
1046
  char *buffer = taosMemoryMalloc(fileSize);
×
1047
  if (buffer == NULL) {
×
1048
    dError("failed to allocate memory for key verification buffer");
×
1049
    return terrno;
×
1050
  }
1051

1052
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_READ);
×
1053
  if (pFile == NULL) {
×
1054
    dError("failed to open key verification file:%s", verifyFile);
×
1055
    taosMemoryFree(buffer);
×
1056
    return TSDB_CODE_FILE_CORRUPTED;
×
1057
  }
1058

1059
  int64_t bytesRead = taosReadFile(pFile, buffer, fileSize);
×
1060
  (void)taosCloseFile(&pFile);
×
1061

1062
  if (bytesRead != fileSize) {
×
1063
    dError("failed to read key verification file, read=%" PRId64 ", expected=%" PRId64, bytesRead, fileSize);
×
1064
    taosMemoryFree(buffer);
×
1065
    return TSDB_CODE_FILE_CORRUPTED;
×
1066
  }
1067

1068
  int32_t     code = 0;
×
1069
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
×
1070
  int32_t     plaintextLen = strlen(plaintext);
×
1071
  const char *ptr = buffer;
×
1072

1073
  // Parse and verify header
1074
  uint32_t magic = 0;
×
1075
  uint16_t version = 0;
×
1076
  memcpy(&magic, ptr, sizeof(magic));
×
1077
  ptr += sizeof(magic);
×
1078
  memcpy(&version, ptr, sizeof(version));
×
1079
  ptr += sizeof(version);
×
1080

1081
  if (magic != 0x544B5659) {
×
1082
    dError("invalid magic number in key verification file: 0x%x", magic);
×
1083
    taosMemoryFree(buffer);
×
1084
    return TSDB_CODE_FILE_CORRUPTED;
×
1085
  }
1086

1087
  // Array of keys and their algorithms
1088
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
×
1089
  int32_t     expectedAlgos[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
×
1090
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
×
1091

1092
  // Verify each key from buffer
1093
  for (int i = 0; i < 5; i++) {
×
1094
    // Check if we have enough data remaining
1095
    if (ptr - buffer + sizeof(int32_t) * 2 > fileSize) {
×
1096
      dError("unexpected end of file while reading %s metadata", keyNames[i]);
×
1097
      taosMemoryFree(buffer);
×
1098
      return TSDB_CODE_FILE_CORRUPTED;
×
1099
    }
1100

1101
    int32_t savedAlgo = 0;
×
1102
    int32_t encryptedLen = 0;
×
1103

1104
    memcpy(&savedAlgo, ptr, sizeof(int32_t));
×
1105
    ptr += sizeof(int32_t);
×
1106
    memcpy(&encryptedLen, ptr, sizeof(int32_t));
×
1107
    ptr += sizeof(int32_t);
×
1108

1109
    if (encryptedLen <= 0 || encryptedLen > 512) {
×
1110
      dError("invalid encrypted length %d for %s", encryptedLen, keyNames[i]);
×
1111
      taosMemoryFree(buffer);
×
1112
      return TSDB_CODE_FILE_CORRUPTED;
×
1113
    }
1114

1115
    if (ptr - buffer + encryptedLen > fileSize) {
×
1116
      dError("unexpected end of file while reading %s encrypted data", keyNames[i]);
×
1117
      taosMemoryFree(buffer);
×
1118
      return TSDB_CODE_FILE_CORRUPTED;
×
1119
    }
1120

1121
    uint8_t encrypted[512] = {0};
×
1122
    memcpy(encrypted, ptr, encryptedLen);
×
1123
    ptr += encryptedLen;
×
1124

1125
    // Decrypt with current key using CBC
1126
    char decrypted[512] = {0};
×
1127

1128
    SCryptOpts opts = {0};
×
1129
    opts.len = encryptedLen;
×
1130
    opts.source = (char *)encrypted;
×
1131
    opts.result = decrypted;
×
1132
    opts.unitLen = 16;
×
1133
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
×
1134
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
×
1135

1136
    int32_t count = CBC_Decrypt(&opts);
×
1137
    if (count != opts.len) {
×
1138
      code = terrno ? terrno : TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1139
      dError("failed to decrypt verification for %s, count=%d, expected=%d, since %s - KEY IS INCORRECT", keyNames[i],
×
1140
             count, opts.len, tstrerror(code));
1141
      taosMemoryFree(buffer);
×
1142
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1143
    }
1144

1145
    // Verify decrypted data matches original plaintext (compare only the plaintext length)
1146
    if (memcmp(decrypted, plaintext, plaintextLen) != 0) {
×
1147
      dError("%s verification FAILED: decrypted text does not match - KEY IS INCORRECT", keyNames[i]);
×
1148
      taosMemoryFree(buffer);
×
1149
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1150
    }
1151

1152
    dInfo("%s verification passed (algorithm=%d)", keyNames[i], savedAlgo);
×
1153
  }
1154

1155
  taosMemoryFree(buffer);
×
1156
  dInfo("all encryption keys verified successfully");
×
1157
  return 0;
×
1158
}
1159

1160
// Public API: Verify and initialize encryption keys at startup
1161
int32_t dmVerifyAndInitEncryptionKeys(void) {
725,438✔
1162
  // Skip verification in dump sdb mode (taosd -s)
1163
  if (tsSkipKeyCheckMode) {
725,438✔
1164
    dInfo("skip encryption key verification in some special check mode");
1,584✔
1165
    return 0;
1,584✔
1166
  }
1167

1168
  // Check if encryption keys are loaded
1169
  if (tsEncryptKeysStatus != TSDB_ENCRYPT_KEY_STAT_LOADED) {
723,854✔
1170
    dDebug("encryption keys not loaded, skipping verification");
721,464✔
1171
    return 0;
721,464✔
1172
  }
1173

1174
  // Get key file paths
1175
  char    masterKeyFile[PATH_MAX] = {0};
2,390✔
1176
  char    derivedKeyFile[PATH_MAX] = {0};
2,390✔
1177
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
2,390✔
1178
                            TD_DIRSEP, TD_DIRSEP);
1179
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
2,390✔
1180
    dError("failed to build master key file path");
×
1181
    return TSDB_CODE_OUT_OF_BUFFER;
×
1182
  }
1183

1184
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
2,390✔
1185
                    TD_DIRSEP, TD_DIRSEP);
1186
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
2,390✔
1187
    dError("failed to build derived key file path");
×
1188
    return TSDB_CODE_OUT_OF_BUFFER;
×
1189
  }
1190

1191
  // Load encryption keys
1192
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
2,390✔
1193
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
2,390✔
1194
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
2,390✔
1195
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
2,390✔
1196
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
2,390✔
1197
  int32_t algorithm = 0;
2,390✔
1198
  int32_t cfgAlgorithm = 0;
2,390✔
1199
  int32_t metaAlgorithm = 0;
2,390✔
1200
  int32_t fileVersion = 0;
2,390✔
1201
  int32_t keyVersion = 0;
2,390✔
1202
  int64_t createTime = 0;
2,390✔
1203
  int64_t svrKeyUpdateTime = 0;
2,390✔
1204
  int64_t dbKeyUpdateTime = 0;
2,390✔
1205

1206
  int32_t code = taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey,
2,390✔
1207
                                      &algorithm, &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime,
1208
                                      &svrKeyUpdateTime, &dbKeyUpdateTime);
1209
  if (code != 0) {
2,390✔
1210
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1211
    return code;
×
1212
  }
1213

1214
  // Verify all keys
1215
  code = dmVerifyEncryptionKeys(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
2,390✔
1216
  if (code != 0) {
2,390✔
1217
    dError("encryption key verification failed, since %s", tstrerror(code));
×
1218
    return code;
×
1219
  }
1220

1221
  dInfo("encryption keys verified and initialized successfully");
2,390✔
1222
  return 0;
2,390✔
1223
}
1224
#else
1225
int32_t dmVerifyAndInitEncryptionKeys(void) {
1226
  // Community edition or no TaosK support
1227
  return 0;
1228
}
1229
#endif
1230

1231
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1232
static int32_t dmUpdateSvrKey(const char *newKey) {
174✔
1233
  if (newKey == NULL || newKey[0] == '\0') {
174✔
1234
    dError("invalid new SVR_KEY, key is empty");
×
1235
    return TSDB_CODE_INVALID_PARA;
×
1236
  }
1237

1238
  char masterKeyFile[PATH_MAX] = {0};
174✔
1239
  char derivedKeyFile[PATH_MAX] = {0};
174✔
1240

1241
  // Build path to key files
1242
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
174✔
1243
                            TD_DIRSEP, TD_DIRSEP);
1244
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
174✔
1245
    dError("failed to build master key file path");
×
1246
    return TSDB_CODE_OUT_OF_BUFFER;
×
1247
  }
1248

1249
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
174✔
1250
                    TD_DIRSEP, TD_DIRSEP);
1251
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
174✔
1252
    dError("failed to build derived key file path");
×
1253
    return TSDB_CODE_OUT_OF_BUFFER;
×
1254
  }
1255

1256
  // Load current keys
1257
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1258
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1259
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1260
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1261
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1262
  int32_t algorithm = 0;
174✔
1263
  int32_t cfgAlgorithm = 0;
174✔
1264
  int32_t metaAlgorithm = 0;
174✔
1265
  int32_t fileVersion = 0;
174✔
1266
  int32_t keyVersion = 0;
174✔
1267
  int64_t createTime = 0;
174✔
1268
  int64_t svrKeyUpdateTime = 0;
174✔
1269
  int64_t dbKeyUpdateTime = 0;
174✔
1270

1271
  int32_t code =
1272
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
174✔
1273
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1274
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1275
  if (code != 0) {
174✔
1276
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1277
    return code;
×
1278
  }
1279

1280
  // Update SVR_KEY
1281
  int64_t now = taosGetTimestampMs();
174✔
1282
  int32_t newKeyVersion = keyVersion + 1;
174✔
1283

1284
  dInfo("updating SVR_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
174✔
1285
  tstrncpy(svrKey, newKey, sizeof(svrKey));
174✔
1286
  svrKeyUpdateTime = now;
174✔
1287

1288
  // Save updated keys (use algorithm for all keys for backward compatibility)
1289
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
174✔
1290
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1291
  if (code != 0) {
174✔
1292
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1293
    return code;
×
1294
  }
1295

1296
  // Update key verification file with new SVR_KEY
1297
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
174✔
1298
  if (code != 0) {
174✔
1299
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1300
    // Don't fail the operation if verification file update fails
1301
  }
1302

1303
  // Update global variables
1304
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
174✔
1305
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
174✔
1306
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
174✔
1307
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
174✔
1308
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
174✔
1309
  tsEncryptAlgorithmType = algorithm;
174✔
1310
  tsEncryptFileVersion = fileVersion;
174✔
1311
  tsEncryptKeyVersion = newKeyVersion;
174✔
1312
  tsEncryptKeyCreateTime = createTime;
174✔
1313
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
174✔
1314
  tsDbKeyUpdateTime = dbKeyUpdateTime;
174✔
1315

1316
  // Update encryption key status for backward compatibility
1317
  int keyLen = strlen(tsDataKey);
174✔
1318
  if (keyLen > ENCRYPT_KEY_LEN) {
174✔
1319
    keyLen = ENCRYPT_KEY_LEN;
×
1320
  }
1321
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
174✔
1322
  memcpy(tsEncryptKey, tsDataKey, keyLen);
174✔
1323
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
174✔
1324
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
174✔
1325

1326
  dInfo("successfully updated SVR_KEY to version:%d", newKeyVersion);
174✔
1327
  return 0;
174✔
1328
}
1329

1330
static int32_t dmUpdateKeyExpiration(int32_t days, const char *strategy) {
×
1331
  if (days < 0) {
×
1332
    dError("invalid days value:%d, must be >= 0", days);
×
1333
    return TSDB_CODE_INVALID_PARA;
×
1334
  }
1335

1336
  if (strategy == NULL || strategy[0] == '\0') {
×
1337
    dError("invalid strategy, strategy is empty");
×
1338
    return TSDB_CODE_INVALID_PARA;
×
1339
  }
1340

1341
  // Validate strategy value
1342
  if (strcmp(strategy, "ALARM") != 0) {
×
1343
    dWarn("unknown strategy:%s, supported values: ALARM. Will use it anyway.", strategy);
×
1344
  }
1345

1346
  // Update global variables directly
1347
  tsKeyExpirationDays = days;
×
1348
  tstrncpy(tsKeyExpirationStrategy, strategy, sizeof(tsKeyExpirationStrategy));
×
1349

1350
  dInfo("successfully updated key expiration config: days=%d, strategy=%s", days, strategy);
×
1351
  return 0;
×
1352
}
1353

1354
static int32_t dmUpdateDbKey(const char *newKey) {
174✔
1355
  if (newKey == NULL || newKey[0] == '\0') {
174✔
1356
    dError("invalid new DB_KEY, key is empty");
×
1357
    return TSDB_CODE_INVALID_PARA;
×
1358
  }
1359

1360
  char masterKeyFile[PATH_MAX] = {0};
174✔
1361
  char derivedKeyFile[PATH_MAX] = {0};
174✔
1362

1363
  // Build path to key files
1364
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
174✔
1365
                            TD_DIRSEP, TD_DIRSEP);
1366
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
174✔
1367
    dError("failed to build master key file path");
×
1368
    return TSDB_CODE_OUT_OF_BUFFER;
×
1369
  }
1370

1371
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
174✔
1372
                    TD_DIRSEP, TD_DIRSEP);
1373
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
174✔
1374
    dError("failed to build derived key file path");
×
1375
    return TSDB_CODE_OUT_OF_BUFFER;
×
1376
  }
1377

1378
  // Load current keys
1379
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1380
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1381
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1382
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1383
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
174✔
1384
  int32_t algorithm = 0;
174✔
1385
  int32_t cfgAlgorithm = 0;
174✔
1386
  int32_t metaAlgorithm = 0;
174✔
1387
  int32_t fileVersion = 0;
174✔
1388
  int32_t keyVersion = 0;
174✔
1389
  int64_t createTime = 0;
174✔
1390
  int64_t svrKeyUpdateTime = 0;
174✔
1391
  int64_t dbKeyUpdateTime = 0;
174✔
1392

1393
  int32_t code =
1394
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
174✔
1395
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1396
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1397
  if (code != 0) {
174✔
1398
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1399
    return code;
×
1400
  }
1401

1402
  // Update DB_KEY
1403
  int64_t now = taosGetTimestampMs();
174✔
1404
  int32_t newKeyVersion = keyVersion + 1;
174✔
1405

1406
  dInfo("updating DB_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
174✔
1407
  tstrncpy(dbKey, newKey, sizeof(dbKey));
174✔
1408
  dbKeyUpdateTime = now;
174✔
1409

1410
  // Save updated keys (use algorithm for all keys for backward compatibility)
1411
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
174✔
1412
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1413
  if (code != 0) {
174✔
1414
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1415
    return code;
×
1416
  }
1417

1418
  // Update key verification file with new DB_KEY
1419
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
174✔
1420
  if (code != 0) {
174✔
1421
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1422
    // Don't fail the operation if verification file update fails
1423
  }
1424

1425
  // Update global variables
1426
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
174✔
1427
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
174✔
1428
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
174✔
1429
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
174✔
1430
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
174✔
1431
  tsEncryptAlgorithmType = algorithm;
174✔
1432
  tsEncryptFileVersion = fileVersion;
174✔
1433
  tsEncryptKeyVersion = newKeyVersion;
174✔
1434
  tsEncryptKeyCreateTime = createTime;
174✔
1435
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
174✔
1436
  tsDbKeyUpdateTime = dbKeyUpdateTime;
174✔
1437

1438
  // Update encryption key status for backward compatibility
1439
  int keyLen = strlen(tsDataKey);
174✔
1440
  if (keyLen > ENCRYPT_KEY_LEN) {
174✔
1441
    keyLen = ENCRYPT_KEY_LEN;
×
1442
  }
1443
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
174✔
1444
  memcpy(tsEncryptKey, tsDataKey, keyLen);
174✔
1445
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
174✔
1446
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
174✔
1447

1448
  dInfo("successfully updated DB_KEY to version:%d", newKeyVersion);
174✔
1449
  return 0;
174✔
1450
}
1451
#endif
1452

1453
int32_t dmProcessAlterEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
348✔
1454
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1455
  int32_t              code = 0;
348✔
1456
  SMAlterEncryptKeyReq alterKeyReq = {0};
348✔
1457
  if (tDeserializeSMAlterEncryptKeyReq(pMsg->pCont, pMsg->contLen, &alterKeyReq) != 0) {
348✔
1458
    code = TSDB_CODE_INVALID_MSG;
×
1459
    dError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
1460
    goto _exit;
×
1461
  }
1462

1463
  dInfo("received alter encrypt key req, keyType:%d", alterKeyReq.keyType);
348✔
1464

1465
  // Update the specified key (svr_key or db_key)
1466
  if (alterKeyReq.keyType == 0) {
348✔
1467
    // Update SVR_KEY
1468
    code = dmUpdateSvrKey(alterKeyReq.newKey);
174✔
1469
    if (code == 0) {
174✔
1470
      dInfo("successfully updated SVR_KEY");
174✔
1471
    } else {
1472
      dError("failed to update SVR_KEY, since %s", tstrerror(code));
×
1473
    }
1474
  } else if (alterKeyReq.keyType == 1) {
174✔
1475
    // Update DB_KEY
1476
    code = dmUpdateDbKey(alterKeyReq.newKey);
174✔
1477
    if (code == 0) {
174✔
1478
      dInfo("successfully updated DB_KEY");
174✔
1479
    } else {
1480
      dError("failed to update DB_KEY, since %s", tstrerror(code));
×
1481
    }
1482
  } else {
1483
    dError("invalid keyType:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", alterKeyReq.keyType);
×
1484
    code = TSDB_CODE_INVALID_PARA;
×
1485
  }
1486

1487
_exit:
348✔
1488
  tFreeSMAlterEncryptKeyReq(&alterKeyReq);
348✔
1489
  pMsg->code = code;
348✔
1490
  pMsg->info.rsp = NULL;
348✔
1491
  pMsg->info.rspLen = 0;
348✔
1492
  return code;
348✔
1493
#else
1494
  dError("encryption key management is only available in enterprise edition");
1495
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1496
  pMsg->info.rsp = NULL;
1497
  pMsg->info.rspLen = 0;
1498
  return TSDB_CODE_OPS_NOT_SUPPORT;
1499
#endif
1500
}
1501

1502
int32_t dmProcessAlterKeyExpirationReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1503
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1504
  int32_t                 code = 0;
×
1505
  SMAlterKeyExpirationReq alterReq = {0};
×
1506
  if (tDeserializeSMAlterKeyExpirationReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
×
1507
    code = TSDB_CODE_INVALID_MSG;
×
1508
    dError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
1509
    goto _exit;
×
1510
  }
1511

1512
  dInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
1513

1514
  // Update key expiration configuration
1515
  code = dmUpdateKeyExpiration(alterReq.days, alterReq.strategy);
×
1516
  if (code == 0) {
×
1517
    dInfo("successfully updated key expiration: %d days, strategy: %s", alterReq.days, alterReq.strategy);
×
1518
  } else {
1519
    dError("failed to update key expiration, since %s", tstrerror(code));
×
1520
  }
1521

1522
_exit:
×
1523
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
1524
  pMsg->code = code;
×
1525
  pMsg->info.rsp = NULL;
×
1526
  pMsg->info.rspLen = 0;
×
1527
  return code;
×
1528
#else
1529
  dError("key expiration management is only available in enterprise edition");
1530
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1531
  pMsg->info.rsp = NULL;
1532
  pMsg->info.rspLen = 0;
1533
  return TSDB_CODE_OPS_NOT_SUPPORT;
1534
#endif
1535
}
1536

1537
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1538
  int32_t code = 0;
×
1539
  int32_t lino = 0;
×
1540
  SMsgCb *msgCb = &pMgmt->msgCb;
×
1541
  void *pTransCli = msgCb->clientRpc;
×
1542
  void *pTransStatus = msgCb->statusRpc;  
×
1543
  void *pTransSync = msgCb->syncRpc; 
×
1544
  void *pTransServer = msgCb->serverRpc;
×
1545

1546
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
1547
  if (code != 0) {
×
1548
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
1549
    goto _error;
×
1550
  }
1551

1552
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
1553
  if (code != 0) {
×
1554
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
1555
    goto _error;
×
1556
  }
1557

1558
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
1559
  if (code != 0) {
×
1560
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
1561
    goto _error;
×
1562
  }
1563

1564
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
1565
  if (code != 0) {
×
1566
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
1567
    goto _error;
×
1568
  }
1569

1570
_error:
×
1571
  
1572
  return code;
×
1573
}
1574

1575
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
263✔
1576
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
263✔
1577
  pStatus->details[0] = 0;
263✔
1578

1579
  SMonMloadInfo minfo = {0};
263✔
1580
  (*pMgmt->getMnodeLoadsFp)(&minfo);
263✔
1581
  if (minfo.isMnode &&
263✔
1582
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
263✔
1583
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1584
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
1585
    return;
×
1586
  }
1587

1588
  SMonVloadInfo vinfo = {0};
263✔
1589
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
263✔
1590
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
789✔
1591
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
526✔
1592
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
526✔
1593
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1594
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
1595
               syncStr(pLoad->syncState));
×
1596
      break;
×
1597
    }
1598
  }
1599

1600
  taosArrayDestroy(vinfo.pVloads);
263✔
1601
}
1602

1603
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
263✔
1604
  int32_t code = 0;
263✔
1605
  dDebug("server run status req is received");
263✔
1606
  SServerStatusRsp statusRsp = {0};
263✔
1607
  dmGetServerRunStatus(pMgmt, &statusRsp);
263✔
1608

1609
  pMsg->info.rsp = NULL;
263✔
1610
  pMsg->info.rspLen = 0;
263✔
1611

1612
  SRpcMsg rspMsg = {.info = pMsg->info};
263✔
1613
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
263✔
1614
  if (rspLen < 0) {
263✔
1615
    return TSDB_CODE_OUT_OF_MEMORY;
×
1616
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1617
    // return rspMsg.code;
1618
  }
1619

1620
  void *pRsp = rpcMallocCont(rspLen);
263✔
1621
  if (pRsp == NULL) {
263✔
1622
    return terrno;
×
1623
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1624
    // return rspMsg.code;
1625
  }
1626

1627
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
263✔
1628
  if (rspLen < 0) {
263✔
1629
    return TSDB_CODE_INVALID_MSG;
×
1630
  }
1631

1632
  pMsg->info.rsp = pRsp;
263✔
1633
  pMsg->info.rspLen = rspLen;
263✔
1634
  return 0;
263✔
1635
}
1636

1637
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
32,941✔
1638
  int32_t code = 0;
32,941✔
1639

1640
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
32,941✔
1641
  if (pBlock == NULL) {
32,941✔
1642
    return terrno;
×
1643
  }
1644

1645
  size_t size = 0;
32,941✔
1646

1647
  const SSysTableMeta *pMeta = NULL;
32,941✔
1648
  getInfosDbMeta(&pMeta, &size);
32,941✔
1649

1650
  int32_t index = 0;
32,941✔
1651
  for (int32_t i = 0; i < size; ++i) {
691,761✔
1652
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
691,761✔
1653
      index = i;
32,941✔
1654
      break;
32,941✔
1655
    }
1656
  }
1657

1658
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
32,941✔
1659
  if (pBlock->pDataBlock == NULL) {
32,941✔
1660
    code = terrno;
×
1661
    goto _exit;
×
1662
  }
1663

1664
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
230,587✔
1665
    SColumnInfoData colInfoData = {0};
197,646✔
1666
    colInfoData.info.colId = i + 1;
197,646✔
1667
    colInfoData.info.type = pMeta[index].schema[i].type;
197,646✔
1668
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
197,646✔
1669
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
395,292✔
1670
      code = terrno;
×
1671
      goto _exit;
×
1672
    }
1673
  }
1674

1675
  pBlock->info.hasVarCol = true;
32,941✔
1676
_exit:
32,941✔
1677
  if (code != 0) {
32,941✔
1678
    blockDataDestroy(pBlock);
×
1679
  } else {
1680
    *ppBlock = pBlock;
32,941✔
1681
  }
1682
  return code;
32,941✔
1683
}
1684

1685
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
32,941✔
1686
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL, SHOW_VAR_PRIV_ALL);
32,941✔
1687
  if (code != 0) {
32,941✔
1688
    return code;
×
1689
  }
1690

1691
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
32,941✔
1692
  if (pColInfo == NULL) {
32,941✔
1693
    return TSDB_CODE_OUT_OF_RANGE;
×
1694
  }
1695

1696
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
32,941✔
1697
}
1698

1699
static int32_t dmBuildCpuAllocationBlock(SSDataBlock **ppBlock) {
2,233✔
1700
  int32_t      code = 0;
2,233✔
1701
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
2,233✔
1702
  if (pBlock == NULL) return terrno;
2,233✔
1703

1704
  size_t               size = 0;
2,233✔
1705
  const SSysTableMeta *pMeta = NULL;
2,233✔
1706
  getInfosDbMeta(&pMeta, &size);
2,233✔
1707

1708
  int32_t index = -1;
2,233✔
1709
  for (int32_t i = 0; i < (int32_t)size; ++i) {
49,126✔
1710
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_CPU_ALLOCATION) == 0) {
49,126✔
1711
      index = i;
2,233✔
1712
      break;
2,233✔
1713
    }
1714
  }
1715
  if (index < 0) {
2,233✔
1716
    taosMemoryFree(pBlock);
×
1717
    return TSDB_CODE_INTERNAL_ERROR;
×
1718
  }
1719

1720
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
2,233✔
1721
  if (pBlock->pDataBlock == NULL) {
2,233✔
1722
    code = terrno;
×
1723
    goto _exit_cpu;
×
1724
  }
1725

1726
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
13,398✔
1727
    SColumnInfoData colInfoData = {0};
11,165✔
1728
    colInfoData.info.colId = i + 1;
11,165✔
1729
    colInfoData.info.type = pMeta[index].schema[i].type;
11,165✔
1730
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
11,165✔
1731
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
22,330✔
1732
      code = terrno;
×
1733
      goto _exit_cpu;
×
1734
    }
1735
  }
1736

1737
  pBlock->info.hasVarCol = true;
2,233✔
1738
_exit_cpu:
2,233✔
1739
  if (code != 0) {
2,233✔
1740
    blockDataDestroy(pBlock);
×
1741
  } else {
1742
    *ppBlock = pBlock;
2,233✔
1743
  }
1744
  return code;
2,233✔
1745
}
1746

1747
static int32_t dmAppendCpuAllocationRow(SSDataBlock *pBlock, int32_t dnodeId, const char *category, int32_t cores,
6,699✔
1748
                                        const char *coreIds, bool enabled) {
1749
  int32_t code = 0;
6,699✔
1750
  int32_t row = pBlock->info.rows;
6,699✔
1751

1752
  // column 0: dnode_id (INT)
1753
  SColumnInfoData *pCol0 = taosArrayGet(pBlock->pDataBlock, 0);
6,699✔
1754
  if (pCol0 == NULL) return TSDB_CODE_OUT_OF_RANGE;
6,699✔
1755
  code = colDataSetVal(pCol0, row, (const char *)&dnodeId, false);
6,699✔
1756
  if (code) return code;
6,699✔
1757

1758
  // column 1: thread_category (VARCHAR)
1759
  SColumnInfoData *pCol1 = taosArrayGet(pBlock->pDataBlock, 1);
6,699✔
1760
  if (pCol1 == NULL) return TSDB_CODE_OUT_OF_RANGE;
6,699✔
1761
  char catBuf[16 + VARSTR_HEADER_SIZE] = {0};
6,699✔
1762
  STR_TO_VARSTR(catBuf, category);
6,699✔
1763
  code = colDataSetVal(pCol1, row, catBuf, false);
6,699✔
1764
  if (code) return code;
6,699✔
1765

1766
  // column 2: cores (INT)
1767
  SColumnInfoData *pCol2 = taosArrayGet(pBlock->pDataBlock, 2);
6,699✔
1768
  if (pCol2 == NULL) return TSDB_CODE_OUT_OF_RANGE;
6,699✔
1769
  code = colDataSetVal(pCol2, row, (const char *)&cores, false);
6,699✔
1770
  if (code) return code;
6,699✔
1771

1772
  // column 3: core_ids (VARCHAR)
1773
  SColumnInfoData *pCol3 = taosArrayGet(pBlock->pDataBlock, 3);
6,699✔
1774
  if (pCol3 == NULL) return TSDB_CODE_OUT_OF_RANGE;
6,699✔
1775
  char idsBuf[256 + VARSTR_HEADER_SIZE] = {0};
6,699✔
1776
  STR_TO_VARSTR(idsBuf, coreIds);
6,699✔
1777
  code = colDataSetVal(pCol3, row, idsBuf, false);
6,699✔
1778
  if (code) return code;
6,699✔
1779

1780
  // column 4: enabled (BOOL)
1781
  SColumnInfoData *pCol4 = taosArrayGet(pBlock->pDataBlock, 4);
6,699✔
1782
  if (pCol4 == NULL) return TSDB_CODE_OUT_OF_RANGE;
6,699✔
1783
  int8_t boolVal = enabled ? 1 : 0;
6,699✔
1784
  code = colDataSetVal(pCol4, row, (const char *)&boolVal, false);
6,699✔
1785
  if (code) return code;
6,699✔
1786

1787
  pBlock->info.rows++;
6,699✔
1788
  return 0;
6,699✔
1789
}
1790

1791
static int32_t dmFillCpuAllocationBlock(SSDataBlock *pBlock, int32_t dnodeId) {
2,233✔
1792
  int32_t                code = 0;
2,233✔
1793
  const SCpuAllocStatus *status = taosGetCpuAllocStatus();
2,233✔
1794
  const char            *catNames[] = {"management", "write", "read"};
2,233✔
1795

1796
  code = blockDataEnsureCapacity(pBlock, THREAD_CAT_COUNT);
2,233✔
1797
  if (code) return code;
2,233✔
1798

1799
  for (int32_t c = 0; c < THREAD_CAT_COUNT; c++) {
8,932✔
1800
    char    coreIdsBuf[256] = {0};
6,699✔
1801
    int32_t cores = 0;
6,699✔
1802
    bool    enabled = false;
6,699✔
1803

1804
    if (status->enabled) {
6,699✔
1805
      cores = status->sets[c].count;
5,259✔
1806
      enabled = true;
5,259✔
1807
      int off = 0;
5,259✔
1808
      for (int32_t i = 0; i < status->sets[c].count && off < (int)sizeof(coreIdsBuf) - 8; i++) {
75,379✔
1809
        off +=
70,120✔
1810
            snprintf(coreIdsBuf + off, sizeof(coreIdsBuf) - off, "%s%d", i > 0 ? "," : "", status->sets[c].coreIds[i]);
70,120✔
1811
      }
1812
    } else {
1813
      tstrncpy(coreIdsBuf, "-", sizeof(coreIdsBuf));
1,440✔
1814
    }
1815

1816
    code = dmAppendCpuAllocationRow(pBlock, dnodeId, catNames[c], cores, coreIdsBuf, enabled);
6,699✔
1817
    if (code) return code;
6,699✔
1818
  }
1819
  return 0;
2,233✔
1820
}
1821

1822
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
35,174✔
1823
  int32_t           size = 0;
35,174✔
1824
  int32_t           rowsRead = 0;
35,174✔
1825
  int32_t           code = 0;
35,174✔
1826
  SRetrieveTableReq retrieveReq = {0};
35,174✔
1827
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
35,174✔
1828
    return TSDB_CODE_INVALID_MSG;
×
1829
  }
1830
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
35,174✔
1831
#if 0
1832
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
1833
    code = TSDB_CODE_MND_NO_RIGHTS;
1834
    return code;
1835
  }
1836
#endif
1837
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES) != 0 &&
35,174✔
1838
      strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_CPU_ALLOCATION) != 0) {
2,233✔
1839
    return TSDB_CODE_INVALID_MSG;
×
1840
  }
1841

1842
  SSDataBlock *pBlock = NULL;
35,174✔
1843

1844
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_CPU_ALLOCATION) == 0) {
35,174✔
1845
    if ((code = dmBuildCpuAllocationBlock(&pBlock)) != 0) {
2,233✔
1846
      return code;
×
1847
    }
1848
    code = dmFillCpuAllocationBlock(pBlock, pMgmt->pData->dnodeId);
2,233✔
1849
    if (code != 0) {
2,233✔
1850
      blockDataDestroy(pBlock);
×
1851
      return code;
×
1852
    }
1853
  } else {
1854
    if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
32,941✔
1855
      return code;
×
1856
    }
1857

1858
    code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
32,941✔
1859
    if (code != 0) {
32,941✔
1860
      blockDataDestroy(pBlock);
×
1861
      return code;
×
1862
    }
1863
  }
1864

1865
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
35,174✔
1866
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
35,174✔
1867
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
35,174✔
1868

1869
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
35,174✔
1870
  if (pRsp == NULL) {
35,174✔
1871
    code = terrno;
×
1872
    dError("failed to retrieve data since %s", tstrerror(code));
×
1873
    blockDataDestroy(pBlock);
×
1874
    return code;
×
1875
  }
1876

1877
  char *pStart = pRsp->data;
35,174✔
1878
  *(int32_t *)pStart = htonl(numOfCols);
35,174✔
1879
  pStart += sizeof(int32_t);  // number of columns
35,174✔
1880

1881
  for (int32_t i = 0; i < numOfCols; ++i) {
243,985✔
1882
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
208,811✔
1883
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
208,811✔
1884

1885
    pSchema->bytes = htonl(pColInfo->info.bytes);
208,811✔
1886
    pSchema->colId = htons(pColInfo->info.colId);
208,811✔
1887
    pSchema->type = pColInfo->info.type;
208,811✔
1888

1889
    pStart += sizeof(SSysTableSchema);
208,811✔
1890
  }
1891

1892
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
35,174✔
1893
  if (len < 0) {
35,174✔
1894
    dError("failed to retrieve data since %s", tstrerror(code));
×
1895
    blockDataDestroy(pBlock);
×
1896
    rpcFreeCont(pRsp);
×
1897
    return terrno;
×
1898
  }
1899

1900
  pRsp->numOfRows = htonl(pBlock->info.rows);
35,174✔
1901
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
35,174✔
1902
  pRsp->completed = 1;
35,174✔
1903
  pMsg->info.rsp = pRsp;
35,174✔
1904
  pMsg->info.rspLen = size;
35,174✔
1905
  dDebug("dnode variables retrieve completed");
35,174✔
1906

1907
  blockDataDestroy(pBlock);
35,174✔
1908
  return TSDB_CODE_SUCCESS;
35,174✔
1909
}
1910

1911
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
20,195,556✔
1912
  SMStreamHbRspMsg rsp = {0};
20,195,556✔
1913
  int32_t          code = 0;
20,195,556✔
1914
  SDecoder         decoder;
20,192,125✔
1915
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
20,195,556✔
1916
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
20,195,556✔
1917
  int64_t          currTs = taosGetTimestampMs();
20,195,556✔
1918

1919
  if (pMsg->code) {
20,195,556✔
1920
    return streamHbHandleRspErr(pMsg->code, currTs);
191,727✔
1921
  }
1922

1923
  tDecoderInit(&decoder, (uint8_t*)msg, len);
20,003,829✔
1924
  code = tDecodeStreamHbRsp(&decoder, &rsp);
20,003,829✔
1925
  if (code < 0) {
20,003,829✔
1926
    code = TSDB_CODE_INVALID_MSG;
×
1927
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
1928
    tDecoderClear(&decoder);
×
1929
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
1930
    return streamHbHandleRspErr(code, currTs);
×
1931
  }
1932

1933
  tDecoderClear(&decoder);
20,003,829✔
1934

1935
  return streamHbProcessRspMsg(&rsp);
20,003,829✔
1936
}
1937

1938

1939
SArray *dmGetMsgHandles() {
725,694✔
1940
  int32_t code = -1;
725,694✔
1941
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
725,694✔
1942
  if (pArray == NULL) {
725,694✔
1943
    return NULL;
×
1944
  }
1945

1946
  // Requests handled by DNODE
1947
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1948
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1949
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1950
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1951
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1952
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1953
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1954
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1955
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1956
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1957
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1958
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1959
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1960
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1961
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1962
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_KEY_EXPIRATION, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1963
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1964
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1965

1966
  // Requests handled by MNODE
1967
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1968
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1969
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
725,694✔
1970

1971
  code = 0;
725,694✔
1972

1973
_OVER:
725,694✔
1974
  if (code != 0) {
725,694✔
1975
    taosArrayDestroy(pArray);
×
1976
    return NULL;
×
1977
  } else {
1978
    return pArray;
725,694✔
1979
  }
1980
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc