• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.92
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "crypt.h"
21
#include "monitor.h"
22
#include "stream.h"
23
#include "systable.h"
24
#include "tanalytics.h"
25
#include "tchecksum.h"
26
#include "tencrypt.h"
27
#include "tutil.h"
28

29
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
30
#include "taoskInt.h"
31
#endif
32

33
extern SConfig *tsCfg;
34
extern void     setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId);
35

36
#ifndef TD_ENTERPRISE
37
void setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId) {}
38
#endif
39

40
extern void getAuditDbNameToken(char *pDb, char *pToken);
41

42
#ifndef TD_ENTERPRISE
43
void getAuditDbNameToken(char *pDb, char *pToken) {}
44
#endif
45

46
extern void getAuditEpSet(SEpSet *ep, int32_t *pVgId);
47

48
#ifndef TD_ENTERPRISE
49
void getAuditEpSet(SEpSet *ep, int32_t *pVgId) {}
50
#endif
51

52
SMonVloadInfo tsVinfo = {0};
53
SMnodeLoad    tsMLoad = {0};
54
SDnodeData    tsDnodeData = {0};
55

56
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
2,013,193✔
57
  int32_t code = 0;
2,013,193✔
58
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
2,013,193✔
59
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
502,923✔
60
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
502,923✔
61
    pMgmt->pData->dnodeId = pCfg->dnodeId;
502,923✔
62
    pMgmt->pData->clusterId = pCfg->clusterId;
502,923✔
63
    monSetDnodeId(pCfg->dnodeId);
502,923✔
64
    auditSetDnodeId(pCfg->dnodeId);
502,923✔
65
    code = dmWriteEps(pMgmt->pData);
502,923✔
66
    if (code != 0) {
502,923✔
67
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
2,696✔
68
            tstrerror(code));
69
    }
70
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
502,923✔
71
  }
72
}
2,013,193✔
73

74
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,183,866✔
75
  int32_t code = 0;
2,183,866✔
76
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,183,866✔
77
  if (pMgmt->pData->ipWhiteVer == ver) {
2,183,866✔
78
    if (ver == 0) {
2,183,445✔
79
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,182,755✔
80
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,182,755✔
81
        dError("failed to disable ip white list on dnode");
×
82
      }
83
    }
84
    return;
2,183,445✔
85
  }
86
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
421✔
87

88
  SRetrieveWhiteListReq req = {.ver = oldVer};
421✔
89
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
421✔
90
  if (contLen < 0) {
421✔
91
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
92
    return;
×
93
  }
94
  void *pHead = rpcMallocCont(contLen);
421✔
95
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
421✔
96
  if (contLen < 0) {
421✔
97
    rpcFreeCont(pHead);
×
98
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
99
    return;
×
100
  }
101

102
  SRpcMsg rpcMsg = {.pCont = pHead,
421✔
103
                    .contLen = contLen,
104
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
105
                    .info.ahandle = 0,
106
                    .info.notFreeAhandle = 1,
107
                    .info.refId = 0,
108
                    .info.noResp = 0,
109
                    .info.handle = 0};
110
  SEpSet  epset = {0};
421✔
111

112
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
421✔
113

114
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
421✔
115
  if (code != 0) {
421✔
116
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
117
  }
118
}
119

120

121

122
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,183,866✔
123
  int32_t code = 0;
2,183,866✔
124
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,183,866✔
125
  if (pMgmt->pData->timeWhiteVer == ver) {
2,183,866✔
126
    if (ver == 0) {
2,183,445✔
127
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,182,755✔
128
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,182,755✔
129
        dError("failed to disable time white list on dnode");
×
130
      }
131
    }
132
    return;
2,183,445✔
133
  }
134
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
421✔
135

136
  SRetrieveWhiteListReq req = {.ver = oldVer};
421✔
137
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
421✔
138
  if (contLen < 0) {
421✔
139
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
140
    return;
×
141
  }
142
  void *pHead = rpcMallocCont(contLen);
421✔
143
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
421✔
144
  if (contLen < 0) {
421✔
145
    rpcFreeCont(pHead);
×
146
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
147
    return;
×
148
  }
149

150
  SRpcMsg rpcMsg = {.pCont = pHead,
421✔
151
                    .contLen = contLen,
152
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
153
                    .info.ahandle = 0,
154
                    .info.notFreeAhandle = 1,
155
                    .info.refId = 0,
156
                    .info.noResp = 0,
157
                    .info.handle = 0};
158
  SEpSet  epset = {0};
421✔
159

160
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
421✔
161

162
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
421✔
163
  if (code != 0) {
421✔
164
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
165
  }
166
}
167

168

169

170
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,183,866✔
171
  int32_t code = 0;
2,183,866✔
172
  int64_t oldVer = taosAnalyGetVersion();
2,183,866✔
173
  if (oldVer == newVer) return;
2,183,866✔
174
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
175

176
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
177
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
178
  if (contLen < 0) {
×
179
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
180
    return;
×
181
  }
182

183
  void *pHead = rpcMallocCont(contLen);
×
184
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
185
  if (contLen < 0) {
×
186
    rpcFreeCont(pHead);
×
187
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
188
    return;
×
189
  }
190

191
  SRpcMsg rpcMsg = {
×
192
      .pCont = pHead,
193
      .contLen = contLen,
194
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
195
      .info.ahandle = 0,
196
      .info.refId = 0,
197
      .info.noResp = 0,
198
      .info.handle = 0,
199
  };
200
  SEpSet epset = {0};
×
201

202
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
203

204
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
205
  if (code != 0) {
×
206
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
207
  }
208
}
209

210
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
51,913,879✔
211
  const STraceId *trace = &pRsp->info.traceId;
51,913,879✔
212
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
51,913,879✔
213

214
  if (pRsp->code != 0) {
51,913,879✔
215
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
463,304✔
216
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
7,882✔
217
             pMgmt->statusSeq);
218
      pMgmt->pData->dropped = 1;
7,882✔
219
      if (dmWriteEps(pMgmt->pData) != 0) {
7,882✔
220
        dError("failed to write dnode file");
×
221
      }
222
      dInfo("dnode will exit since it is in the dropped state");
7,882✔
223
      (void)raise(SIGINT);
7,882✔
224
    }
225
  } else {
226
    SStatusRsp statusRsp = {0};
51,450,575✔
227
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
53,634,441✔
228
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,183,866✔
229
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,183,866✔
230
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
2,013,193✔
231
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
232
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
2,013,193✔
233
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
2,013,193✔
234
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
2,013,193✔
235
      }
236
      setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken, &(statusRsp.auditEpSet), statusRsp.auditVgId);
2,183,866✔
237
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,183,866✔
238
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,183,866✔
239
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,183,866✔
240
    }
241
    tFreeSStatusRsp(&statusRsp);
51,450,575✔
242
  }
243
  rpcFreeCont(pRsp->pCont);
51,913,879✔
244
}
51,913,879✔
245

246
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
52,003,929✔
247
  int32_t    code = 0;
52,003,929✔
248
  SStatusReq req = {0};
52,003,929✔
249
  req.timestamp = taosGetTimestampMs();
52,003,929✔
250
  pMgmt->statusSeq++;
52,003,929✔
251

252
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
52,003,929✔
253
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
52,003,929✔
254
    dError("failed to lock status info lock");
×
255
    return;
×
256
  }
257

258
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
52,003,929✔
259
  req.sver = tsVersion;
52,003,929✔
260
  req.dnodeVer = tsDnodeData.dnodeVer;
52,003,929✔
261
  req.dnodeId = tsDnodeData.dnodeId;
52,003,929✔
262
  req.clusterId = tsDnodeData.clusterId;
52,003,929✔
263
  if (req.clusterId == 0) req.dnodeId = 0;
52,003,929✔
264
  req.rebootTime = tsDnodeData.rebootTime;
52,003,929✔
265
  req.updateTime = tsDnodeData.updateTime;
52,003,929✔
266
  req.numOfCores = tsNumOfCores;
52,003,929✔
267
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
52,003,929✔
268
  req.numOfDiskCfg = tsDiskCfgNum;
52,003,929✔
269
  req.memTotal = tsTotalMemoryKB * 1024;
52,003,929✔
270
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
52,003,929✔
271
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
52,003,929✔
272
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
52,003,929✔
273

274
  req.clusterCfg.statusInterval = tsStatusInterval;
52,003,929✔
275
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
52,003,929✔
276
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
52,003,929✔
277
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
52,003,929✔
278
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
52,003,929✔
279
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
52,003,929✔
280
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
52,003,929✔
281
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
52,003,929✔
282
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
52,003,929✔
283
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
52,003,929✔
284
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
52,003,929✔
285
  req.clusterCfg.checkTime = (int64_t)taosGetLocalTimezoneOffset(&code);
52,003,929✔
286
  if (code != 0) {
52,003,929✔
287
    dError("failed to get local timezone offset, since %s", tstrerror(code));
×
288
    (void)taosThreadMutexUnlock(&pMgmt->pData->statusInfolock);
×
289
    return;
×
290
  }
291

292
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
52,003,929✔
293
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
52,003,929✔
294
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
52,003,929✔
295
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
52,003,929✔
296

297
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
52,003,929✔
298

299
  req.pVloads = tsVinfo.pVloads;
52,003,929✔
300
  tsVinfo.pVloads = NULL;
52,003,929✔
301

302
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
52,003,929✔
303
  req.mload = tsMLoad;
52,003,929✔
304

305
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
52,003,929✔
306
    dError("failed to unlock status info lock");
×
307
    return;
×
308
  }
309

310
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
52,003,929✔
311
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
52,003,929✔
312

313
  req.statusSeq = pMgmt->statusSeq;
52,003,929✔
314
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
52,003,929✔
315
  req.analVer = taosAnalyGetVersion();
52,003,929✔
316
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
52,003,929✔
317

318
  if (tsAuditUseToken) {
52,003,929✔
319
    getAuditDbNameToken(req.auditDB, req.auditToken);
52,000,718✔
320
  }
321

322
  if (tsAuditSaveInSelf) {
52,003,929✔
323
    getAuditEpSet(&req.auditEpSet, &req.auditVgId);
1,876✔
324
  }
325

326
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
52,003,929✔
327
  if (contLen < 0) {
52,003,929✔
328
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
329
    return;
×
330
  }
331

332
  void *pHead = rpcMallocCont(contLen);
52,003,929✔
333
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
52,003,929✔
334
  if (contLen < 0) {
52,003,929✔
335
    rpcFreeCont(pHead);
×
336
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
337
    return;
×
338
  }
339
  tFreeSStatusReq(&req);
52,003,929✔
340

341
  SRpcMsg rpcMsg = {.pCont = pHead,
52,003,929✔
342
                    .contLen = contLen,
343
                    .msgType = TDMT_MND_STATUS,
344
                    .info.ahandle = 0,
345
                    .info.notFreeAhandle = 1,
346
                    .info.refId = 0,
347
                    .info.noResp = 0,
348
                    .info.handle = 0};
349
  SRpcMsg rpcRsp = {0};
52,003,929✔
350

351
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
52,003,929✔
352

353
  SEpSet epSet = {0};
52,003,929✔
354
  int8_t epUpdated = 0;
52,003,929✔
355
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
52,003,929✔
356

357
  if (dDebugFlag & DEBUG_TRACE) {
52,003,929✔
358
    char tbuf[512];
2,184,499✔
359
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
2,188,118✔
360
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
2,188,118✔
361
  }
362
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
52,003,929✔
363
  if (code != 0) {
52,003,929✔
364
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
90,050✔
365
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
90,050✔
366
      dmRotateMnodeEpSet(pMgmt->pData);
90,050✔
367
      char tbuf[512];
90,050✔
368
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
90,050✔
369
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
90,050✔
370
            tbuf, epSet.inUse);
371
    }
372
    return;
90,050✔
373
  }
374

375
  if (rpcRsp.code != 0) {
51,913,879✔
376
    dmRotateMnodeEpSet(pMgmt->pData);
463,304✔
377
    char tbuf[512];
463,304✔
378
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
463,304✔
379
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
463,304✔
380
          tbuf, epSet.inUse);
381
  } else {
382
    if (epUpdated == 1) {
51,450,575✔
383
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
104,933✔
384
    }
385
  }
386
  dmProcessStatusRsp(pMgmt, &rpcRsp);
51,913,879✔
387
}
388

389
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
648,705✔
390
  const STraceId *trace = &pRsp->info.traceId;
648,705✔
391
  int32_t         code = 0;
648,705✔
392
  SConfigRsp      configRsp = {0};
648,705✔
393
  bool            needStop = false;
648,705✔
394

395
  if (pRsp->code != 0) {
648,705✔
396
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
397
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
398
      pMgmt->pData->dropped = 1;
×
399
      if (dmWriteEps(pMgmt->pData) != 0) {
×
400
        dError("failed to write dnode file");
×
401
      }
402
      dInfo("dnode will exit since it is in the dropped state");
×
403
      (void)raise(SIGINT);
×
404
    }
405
  } else {
406
    bool needUpdate = false;
648,705✔
407
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,297,410✔
408
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
648,705✔
409
      // Try to use cfg from mnode sdb.
410
      if (!configRsp.isVersionVerified) {
648,705✔
411
        uInfo("config version not verified, update config");
506,024✔
412
        needUpdate = true;
506,024✔
413
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
506,024✔
414
        if (code != TSDB_CODE_SUCCESS) {
506,024✔
415
          dError("failed to persist global config since %s", tstrerror(code));
2,051✔
416
          goto _exit;
2,051✔
417
        }
418
      }
419
    }
420
    if (needUpdate) {
646,654✔
421
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
503,973✔
422
      if (code != TSDB_CODE_SUCCESS) {
503,973✔
423
        dError("failed to update config since %s", tstrerror(code));
×
424
        goto _exit;
×
425
      }
426
      code = setAllConfigs(tsCfg);
503,973✔
427
      if (code != TSDB_CODE_SUCCESS) {
503,973✔
428
        dError("failed to set all configs since %s", tstrerror(code));
1,568✔
429
        goto _exit;
1,568✔
430
      }
431
    }
432
    code = taosPersistLocalConfig(pMgmt->path);
645,086✔
433
    if (code != TSDB_CODE_SUCCESS) {
645,086✔
UNCOV
434
      dError("failed to persist local config since %s", tstrerror(code));
×
435
    }
436
    tsConfigInited = 1;
645,086✔
437
  }
438
_exit:
648,705✔
439
  tFreeSConfigRsp(&configRsp);
648,705✔
440
  rpcFreeCont(pRsp->pCont);
648,705✔
441
  if (needStop) {
648,705✔
442
    dmStop();
×
443
  }
444
}
648,705✔
445

446
int32_t dmProcessKeySyncRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
620,786✔
447
  const STraceId *trace = &pRsp->info.traceId;
620,786✔
448
  int32_t         code = 0;
620,786✔
449
  SKeySyncRsp     keySyncRsp = {0};
620,786✔
450

451
  if (pRsp->code != 0) {
620,786✔
452
    dError("failed to sync keys from mnode since %s", tstrerror(pRsp->code));
×
453
    code = pRsp->code;
×
454
    goto _exit;
×
455
  }
456

457
  if (pRsp->pCont == NULL || pRsp->contLen <= 0) {
620,786✔
458
    dError("invalid key sync response, empty content");
×
459
    code = TSDB_CODE_INVALID_MSG;
×
460
    goto _exit;
×
461
  }
462

463
  code = tDeserializeSKeySyncRsp(pRsp->pCont, pRsp->contLen, &keySyncRsp);
620,786✔
464
  if (code != 0) {
620,786✔
465
    dError("failed to deserialize key sync response since %s", tstrerror(code));
×
466
    goto _exit;
×
467
  }
468

469
  dInfo("received key sync response, mnode keyVersion:%d, local keyVersion:%d, needUpdate:%d", keySyncRsp.keyVersion,
620,786✔
470
        tsLocalKeyVersion, keySyncRsp.needUpdate);
471
  tsEncryptKeysStatus = keySyncRsp.encryptionKeyStatus;
620,786✔
472
  if (keySyncRsp.needUpdate) {
620,786✔
473
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
474
    // Get encrypt file path from tsDataDir
475
    char masterKeyFile[PATH_MAX] = {0};
1,596✔
476
    char derivedKeyFile[PATH_MAX] = {0};
1,596✔
477
    snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,596✔
478
             TD_DIRSEP);
479
    snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,596✔
480
             TD_DIRSEP);
481

482
    dInfo("updating local encryption keys from mnode, key file is saved in %s and %s, keyVersion:%d -> %d",
1,596✔
483
          masterKeyFile, derivedKeyFile, tsLocalKeyVersion, keySyncRsp.keyVersion);
484

485
    // Save keys to master.bin and derived.bin
486
    // Use the same algorithm for cfg and meta keys (backward compatible)
487
    code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, keySyncRsp.svrKey, keySyncRsp.dbKey, keySyncRsp.cfgKey, keySyncRsp.metaKey,
1,596✔
488
                                keySyncRsp.dataKey, keySyncRsp.algorithm, keySyncRsp.algorithm, keySyncRsp.algorithm,
489
                                keySyncRsp.keyVersion, keySyncRsp.createTime,
490
                                keySyncRsp.svrKeyUpdateTime, keySyncRsp.dbKeyUpdateTime);
491
    if (code != 0) {
1,596✔
492
      dError("failed to save encryption keys since %s", tstrerror(code));
×
493
      goto _exit;
×
494
    }
495

496
    // Update global variables with synced keys
497
    tstrncpy(tsSvrKey, keySyncRsp.svrKey, sizeof(tsSvrKey));
1,596✔
498
    tstrncpy(tsDbKey, keySyncRsp.dbKey, sizeof(tsDbKey));
1,596✔
499
    tstrncpy(tsCfgKey, keySyncRsp.cfgKey, sizeof(tsCfgKey));
1,596✔
500
    tstrncpy(tsMetaKey, keySyncRsp.metaKey, sizeof(tsMetaKey));
1,596✔
501
    tstrncpy(tsDataKey, keySyncRsp.dataKey, sizeof(tsDataKey));
1,596✔
502
    tsEncryptAlgorithmType = keySyncRsp.algorithm;
1,596✔
503
    tsEncryptKeyVersion = keySyncRsp.keyVersion;
1,596✔
504
    tsEncryptKeyCreateTime = keySyncRsp.createTime;
1,596✔
505
    tsSvrKeyUpdateTime = keySyncRsp.svrKeyUpdateTime;
1,596✔
506
    tsDbKeyUpdateTime = keySyncRsp.dbKeyUpdateTime;
1,596✔
507

508
    // Update local key version
509
    tsLocalKeyVersion = keySyncRsp.keyVersion;
1,596✔
510
    dInfo("successfully updated local encryption keys to version:%d", tsLocalKeyVersion);
1,596✔
511

512
    // Encrypt existing plaintext config files
513
    code = taosEncryptExistingCfgFiles(tsDataDir);
1,596✔
514
    if (code != 0) {
1,596✔
515
      dWarn("failed to encrypt existing config files since %s, will retry on next write", tstrerror(code));
×
516
      // Don't fail the key sync, files will be encrypted on next write
517
      code = 0;
×
518
    }
519
#else
520
    dWarn("enterprise features not enabled, skipping key sync");
521
#endif
522
  } else {
523
    dDebug("local keys are up to date, version:%d", tsLocalKeyVersion);
619,190✔
524
  }
525
  
526
  code = TSDB_CODE_SUCCESS;
620,786✔
527

528
_exit:
620,786✔
529
  rpcFreeCont(pRsp->pCont);
620,786✔
530
  return code;
620,786✔
531
}
532

533
void dmSendKeySyncReq(SDnodeMgmt *pMgmt) {
628,352✔
534
  int32_t     code = 0;
628,352✔
535
  SKeySyncReq req = {0};
628,352✔
536

537
  req.dnodeId = pMgmt->pData->dnodeId;
628,352✔
538
  req.keyVersion = tsLocalKeyVersion;
628,352✔
539
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d", req.dnodeId, req.keyVersion);
628,352✔
540

541
  int32_t contLen = tSerializeSKeySyncReq(NULL, 0, &req);
628,352✔
542
  if (contLen < 0) {
628,352✔
543
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
544
    return;
18✔
545
  }
546

547
  void *pHead = rpcMallocCont(contLen);
628,352✔
548
  if (pHead == NULL) {
628,352✔
549
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
550
    return;
×
551
  }
552
  contLen = tSerializeSKeySyncReq(pHead, contLen, &req);
628,352✔
553
  if (contLen < 0) {
628,352✔
554
    rpcFreeCont(pHead);
×
555
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
556
    return;
×
557
  }
558

559
  SRpcMsg rpcMsg = {.pCont = pHead,
628,352✔
560
                    .contLen = contLen,
561
                    .msgType = TDMT_MND_KEY_SYNC,
562
                    .info.ahandle = 0,
563
                    .info.notFreeAhandle = 1,
564
                    .info.refId = 0,
565
                    .info.noResp = 0,
566
                    .info.handle = 0};
567
  SRpcMsg rpcRsp = {0};
628,352✔
568

569
  SEpSet epSet = {0};
628,352✔
570
  int8_t epUpdated = 0;
628,352✔
571
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
628,352✔
572

573
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d, begin to send rpc msg", req.dnodeId, req.keyVersion);
628,352✔
574
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
628,352✔
575
  if (code != 0) {
628,352✔
576
    dError("failed to SendRecv key sync req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
7,566✔
577
    return;
7,566✔
578
  }
579
  if (rpcRsp.code != 0) {
620,786✔
580
    dError("failed to send key sync req since %s", tstrerror(rpcRsp.code));
×
581
    return;
×
582
  }
583
  code = dmProcessKeySyncRsp(pMgmt, &rpcRsp);
620,786✔
584
  if (code != 0) {
620,786✔
585
    dError("failed to process key sync rsp since %s", tstrerror(code));
×
586
    return;
×
587
  }
588
}
589

590
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
653,339✔
591
  int32_t    code = 0;
653,339✔
592
  SConfigReq req = {0};
653,339✔
593

594
  req.cver = tsdmConfigVersion;
653,339✔
595
  req.forceReadConfig = true;
653,339✔
596
  req.array = taosGetGlobalCfg(tsCfg);
653,339✔
597
  dDebug("send config req to mnode, configVersion:%d", req.cver);
653,339✔
598

599
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
653,339✔
600
  if (contLen < 0) {
653,339✔
601
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
602
    return;
×
603
  }
604

605
  void *pHead = rpcMallocCont(contLen);
653,339✔
606
  if (pHead == NULL) {
653,339✔
607
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
608
    return;
×
609
  }
610
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
653,339✔
611
  if (contLen < 0) {
653,339✔
612
    rpcFreeCont(pHead);
×
613
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
614
    return;
×
615
  }
616

617
  SRpcMsg rpcMsg = {.pCont = pHead,
653,339✔
618
                    .contLen = contLen,
619
                    .msgType = TDMT_MND_CONFIG,
620
                    .info.ahandle = 0,
621
                    .info.notFreeAhandle = 1,
622
                    .info.refId = 0,
623
                    .info.noResp = 0,
624
                    .info.handle = 0};
625
  SRpcMsg rpcRsp = {0};
653,339✔
626

627
  SEpSet epSet = {0};
653,339✔
628
  int8_t epUpdated = 0;
653,339✔
629
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
653,339✔
630

631
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
653,339✔
632
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
653,339✔
633
  if (code != 0) {
653,339✔
634
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
4,634✔
635
    return;
4,634✔
636
  }
637
  if (rpcRsp.code != 0) {
648,705✔
638
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
639
    return;
×
640
  }
641
  dmProcessConfigRsp(pMgmt, &rpcRsp);
648,705✔
642
}
643

644
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
52,543,151✔
645
  dDebug("begin to get dnode info");
52,543,151✔
646
  SDnodeData dnodeData = {0};
52,543,151✔
647
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
52,543,151✔
648
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
52,543,151✔
649
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
52,543,151✔
650
  dnodeData.clusterId = pMgmt->pData->clusterId;
52,543,151✔
651
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
52,543,151✔
652
  dnodeData.updateTime = pMgmt->pData->updateTime;
52,543,151✔
653
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
52,543,151✔
654
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
52,543,151✔
655

656
  dDebug("begin to get vnode loads");
52,543,151✔
657
  SMonVloadInfo vinfo = {0};
52,543,151✔
658
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
52,543,151✔
659

660
  dDebug("begin to get mnode loads");
52,543,151✔
661
  SMonMloadInfo minfo = {0};
52,543,151✔
662
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
52,543,151✔
663

664
  dDebug("begin to lock status info");
52,543,151✔
665
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
52,543,151✔
666
    dError("failed to lock status info lock");
×
667
    return;
×
668
  }
669
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
52,543,151✔
670
  tsDnodeData.dnodeId = dnodeData.dnodeId;
52,543,151✔
671
  tsDnodeData.clusterId = dnodeData.clusterId;
52,543,151✔
672
  tsDnodeData.rebootTime = dnodeData.rebootTime;
52,543,151✔
673
  tsDnodeData.updateTime = dnodeData.updateTime;
52,543,151✔
674
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
52,543,151✔
675

676
  if (tsVinfo.pVloads == NULL) {
52,543,151✔
677
    tsVinfo.pVloads = vinfo.pVloads;
51,197,991✔
678
    vinfo.pVloads = NULL;
51,197,991✔
679
  } else {
680
    taosArrayDestroy(vinfo.pVloads);
1,345,160✔
681
    vinfo.pVloads = NULL;
1,345,160✔
682
  }
683

684
  tsMLoad = minfo.load;
52,543,151✔
685

686
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
52,543,151✔
687
    dError("failed to unlock status info lock");
×
688
    return;
×
689
  }
690
}
691

692
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
693
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
694
  if (contLen < 0) {
×
695
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
696
    return;
×
697
  }
698
  void *pHead = rpcMallocCont(contLen);
×
699
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
700
  if (contLen < 0) {
×
701
    rpcFreeCont(pHead);
×
702
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
703
    return;
×
704
  }
705

706
  SRpcMsg rpcMsg = {.pCont = pHead,
×
707
                    .contLen = contLen,
708
                    .msgType = TDMT_MND_NOTIFY,
709
                    .info.ahandle = 0,
710
                    .info.notFreeAhandle = 1,
711
                    .info.refId = 0,
712
                    .info.noResp = 1,
713
                    .info.handle = 0};
714

715
  SEpSet epSet = {0};
×
716
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
717
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
718
    dError("failed to send notify req");
×
719
  }
720
}
721

722
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
723
  dError("auth rsp is received, but not supported yet");
×
724
  return 0;
×
725
}
726

727
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
728
  dError("grant rsp is received, but not supported yet");
×
729
  return 0;
×
730
}
731

732
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
95,971✔
733
  int32_t       code = 0;
95,971✔
734
  SDCfgDnodeReq cfgReq = {0};
95,971✔
735
  SConfig      *pCfg = taosGetCfg();
95,971✔
736
  SConfigItem  *pItem = NULL;
95,971✔
737

738
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
95,971✔
739
    return TSDB_CODE_INVALID_MSG;
×
740
  }
741
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
95,971✔
742
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
3,605✔
743
  }
744

745
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
92,366✔
746

747
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
92,366✔
748
  if (code != 0) {
92,366✔
749
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
209✔
750
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
209✔
751
      return TSDB_CODE_SUCCESS;
209✔
752
    } else {
753
      return code;
×
754
    }
755
  }
756
  if (pItem == NULL) {
92,157✔
757
    return TSDB_CODE_CFG_NOT_FOUND;
×
758
  }
759

760
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
92,157✔
761
    char value[10] = {0};
×
762
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
763
      tsSyncTimeout = 0;
×
764
    }
765

766
    if (tsSyncTimeout > 0) {
×
767
      SConfigItem *pItemTmp = NULL;
×
768
      char         tmp[10] = {0};
×
769

770
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout);
×
771
      TAOS_CHECK_RETURN(
×
772
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
773
      if (pItemTmp == NULL) {
×
774
        return TSDB_CODE_CFG_NOT_FOUND;
×
775
      }
776

777
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout / 4);
×
778
      TAOS_CHECK_RETURN(
×
779
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
780
      if (pItemTmp == NULL) {
×
781
        return TSDB_CODE_CFG_NOT_FOUND;
×
782
      }
783
      TAOS_CHECK_RETURN(
×
784
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
785
      if (pItemTmp == NULL) {
×
786
        return TSDB_CODE_CFG_NOT_FOUND;
×
787
      }
788

789
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
790
      TAOS_CHECK_RETURN(
×
791
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
792
      if (pItemTmp == NULL) {
×
793
        return TSDB_CODE_CFG_NOT_FOUND;
×
794
      }
795
      TAOS_CHECK_RETURN(
×
796
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
797
      if (pItemTmp == NULL) {
×
798
        return TSDB_CODE_CFG_NOT_FOUND;
×
799
      }
800
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
801
      if (pItemTmp == NULL) {
×
802
        return TSDB_CODE_CFG_NOT_FOUND;
×
803
      }
804

805
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
806
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
807
      if (pItemTmp == NULL) {
×
808
        return TSDB_CODE_CFG_NOT_FOUND;
×
809
      }
810

811
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
812
      TAOS_CHECK_RETURN(
×
813
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
814
      if (pItemTmp == NULL) {
×
815
        return TSDB_CODE_CFG_NOT_FOUND;
×
816
      }
817
      TAOS_CHECK_RETURN(
×
818
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
819
      if (pItemTmp == NULL) {
×
820
        return TSDB_CODE_CFG_NOT_FOUND;
×
821
      }
822
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
823
      if (pItemTmp == NULL) {
×
824
        return TSDB_CODE_CFG_NOT_FOUND;
×
825
      }
826

827
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
828
            tsSyncTimeout);
829
    }
830
  }
831

832
  if (!isConifgItemLazyMode(pItem)) {
92,157✔
833
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
91,505✔
834

835
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
91,505✔
836
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
837
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
838
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
839

840
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
841
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
842
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
843
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
844

845
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
846
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
847
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
848

849
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
850
            tsSyncTimeout);
851
    }
852
  }
853

854
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
92,157✔
855
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
15,822✔
856
    if (code != TSDB_CODE_SUCCESS) {
15,822✔
857
      dError("failed to persist global config since %s", tstrerror(code));
×
858
    }
859
  } else {
860
    code = taosPersistLocalConfig(pMgmt->path);
76,335✔
861
    if (code != TSDB_CODE_SUCCESS) {
76,335✔
862
      dError("failed to persist local config since %s", tstrerror(code));
×
863
    }
864
  }
865

866
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
92,157✔
867
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
868

869
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
870
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
871
  }
872

873
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
92,157✔
874
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
92,157✔
875
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
876
  }
877

878
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
92,157✔
879
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
92,157✔
880
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
881
  }
882

883
  if (cfgReq.version > 0) {
92,157✔
884
    tsdmConfigVersion = cfgReq.version;
23,000✔
885
  }
886
  return code;
92,157✔
887
}
888

889
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
179✔
890
#ifdef TD_ENTERPRISE
891
  int32_t       code = 0;
179✔
892
  SDCfgDnodeReq cfgReq = {0};
179✔
893
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
179✔
894
    code = TSDB_CODE_INVALID_MSG;
×
895
    goto _exit;
×
896
  }
897

898
  code = dmUpdateEncryptKey(cfgReq.value, true);
179✔
899
  if (code == 0) {
179✔
900
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
179✔
901
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
179✔
902
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
179✔
903
  }
904

905
_exit:
179✔
906
  pMsg->code = code;
179✔
907
  pMsg->info.rsp = NULL;
179✔
908
  pMsg->info.rspLen = 0;
179✔
909
  return code;
179✔
910
#else
911
  return 0;
912
#endif
913
}
914

915
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
916
// Verification plaintext used to validate encryption keys
917
#define KEY_VERIFY_PLAINTEXT "TDengine_Encryption_Key_Verification_v1.0"
918

919
// Save key verification file with encrypted plaintext for each key
920
static int32_t dmSaveKeyVerification(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
2,215✔
921
                                     const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
922
                                     int32_t metaAlgorithm) {
923
  char    verifyFile[PATH_MAX] = {0};
2,215✔
924
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
2,215✔
925
                            TD_DIRSEP, TD_DIRSEP);
926
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
2,215✔
927
    dError("failed to build key verification file path");
×
928
    return TSDB_CODE_OUT_OF_BUFFER;
×
929
  }
930

931
  int32_t     code = 0;
2,215✔
932
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
2,215✔
933
  int32_t     plaintextLen = strlen(plaintext);
2,215✔
934

935
  // Array of keys and their algorithms
936
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
2,215✔
937
  int32_t     algorithms[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
2,215✔
938
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
2,215✔
939

940
  // Calculate total buffer size
941
  int32_t encryptedLen = ((plaintextLen + 15) / 16) * 16;                 // Padded length for CBC
2,215✔
942
  int32_t headerSize = sizeof(uint32_t) + sizeof(uint16_t);               // magic + version
2,215✔
943
  int32_t perKeySize = sizeof(int32_t) + sizeof(int32_t) + encryptedLen;  // algo + len + encrypted data
2,215✔
944
  int32_t totalSize = headerSize + perKeySize * 5;
2,215✔
945

946
  // Allocate buffer for all data
947
  char *buffer = taosMemoryMalloc(totalSize);
2,215✔
948
  if (buffer == NULL) {
2,215✔
949
    dError("failed to allocate memory for key verification buffer");
×
950
    return terrno;
×
951
  }
952

953
  char *ptr = buffer;
2,215✔
954

955
  // Write magic number and version to buffer
956
  uint32_t magic = 0x544B5659;  // "TKVY" in hex
2,215✔
957
  uint16_t version = 1;
2,215✔
958
  memcpy(ptr, &magic, sizeof(magic));
2,215✔
959
  ptr += sizeof(magic);
2,215✔
960
  memcpy(ptr, &version, sizeof(version));
2,215✔
961
  ptr += sizeof(version);
2,215✔
962

963
  // Encrypt all keys and write to buffer
964
  char paddedPlaintext[512] = {0};
2,215✔
965
  memcpy(paddedPlaintext, plaintext, plaintextLen);
2,215✔
966

967
  for (int i = 0; i < 5; i++) {
13,290✔
968
    char encrypted[512] = {0};
11,075✔
969

970
    // Encrypt the verification plaintext with this key using CBC
971
    SCryptOpts opts = {0};
11,075✔
972
    opts.len = encryptedLen;
11,075✔
973
    opts.source = paddedPlaintext;
11,075✔
974
    opts.result = encrypted;
11,075✔
975
    opts.unitLen = 16;
11,075✔
976
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
11,075✔
977
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
11,075✔
978

979
    int32_t count = CBC_Encrypt(&opts);
11,075✔
980
    if (count != opts.len) {
11,075✔
981
      code = terrno ? terrno : TSDB_CODE_FAILED;
×
982
      dError("failed to encrypt verification for %s, count=%d, expected=%d, since %s", keyNames[i], count, opts.len,
×
983
             tstrerror(code));
984
      taosMemoryFree(buffer);
×
985
      return code;
×
986
    }
987

988
    // Write to buffer: algorithm + encrypted length + encrypted data
989
    memcpy(ptr, &algorithms[i], sizeof(int32_t));
11,075✔
990
    ptr += sizeof(int32_t);
11,075✔
991
    memcpy(ptr, &encryptedLen, sizeof(int32_t));
11,075✔
992
    ptr += sizeof(int32_t);
11,075✔
993
    memcpy(ptr, encrypted, encryptedLen);
11,075✔
994
    ptr += encryptedLen;
11,075✔
995

996
    dDebug("prepared verification for %s: algorithm=%d, encLen=%d", keyNames[i], algorithms[i], encryptedLen);
11,075✔
997
  }
998

999
  // Write all data to file in one operation
1000
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
2,215✔
1001
  if (pFile == NULL) {
2,215✔
1002
    dError("failed to create key verification file:%s, errno:%d", verifyFile, errno);
×
1003
    taosMemoryFree(buffer);
×
1004
    return TSDB_CODE_FILE_CORRUPTED;
×
1005
  }
1006

1007
  int64_t written = taosWriteFile(pFile, buffer, totalSize);
2,215✔
1008
  (void)taosCloseFile(&pFile);
2,215✔
1009
  taosMemoryFree(buffer);
2,215✔
1010

1011
  if (written != totalSize) {
2,215✔
1012
    dError("failed to write key verification file, written=%" PRId64 ", expected=%d", written, totalSize);
×
1013
    return TSDB_CODE_FILE_CORRUPTED;
×
1014
  }
1015

1016
  dInfo("successfully saved key verification file:%s, size=%d", verifyFile, totalSize);
2,215✔
1017
  return 0;
2,215✔
1018
}
1019

1020
// Verify all encryption keys by decrypting and comparing with original plaintext
1021
static int32_t dmVerifyEncryptionKeys(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
1,947✔
1022
                                      const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
1023
                                      int32_t metaAlgorithm) {
1024
  char    verifyFile[PATH_MAX] = {0};
1,947✔
1025
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
1,947✔
1026
                            TD_DIRSEP, TD_DIRSEP);
1027
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
1,947✔
1028
    dError("failed to build key verification file path");
×
1029
    return TSDB_CODE_OUT_OF_BUFFER;
×
1030
  }
1031

1032
  // Get file size
1033
  int64_t fileSize = 0;
1,947✔
1034
  if (taosStatFile(verifyFile, &fileSize, NULL, NULL) < 0) {
1,947✔
1035
    // File doesn't exist, create it with current keys
1036
    dInfo("key verification file not found, creating new one");
1,947✔
1037
    return dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
1,947✔
1038
  }
1039

1040
  if (fileSize <= 0 || fileSize > 10240) {  // Max 10KB
×
1041
    dError("invalid key verification file size: %" PRId64, fileSize);
×
1042
    return TSDB_CODE_FILE_CORRUPTED;
×
1043
  }
1044

1045
  // Allocate buffer and read entire file
1046
  char *buffer = taosMemoryMalloc(fileSize);
×
1047
  if (buffer == NULL) {
×
1048
    dError("failed to allocate memory for key verification buffer");
×
1049
    return terrno;
×
1050
  }
1051

1052
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_READ);
×
1053
  if (pFile == NULL) {
×
1054
    dError("failed to open key verification file:%s", verifyFile);
×
1055
    taosMemoryFree(buffer);
×
1056
    return TSDB_CODE_FILE_CORRUPTED;
×
1057
  }
1058

1059
  int64_t bytesRead = taosReadFile(pFile, buffer, fileSize);
×
1060
  (void)taosCloseFile(&pFile);
×
1061

1062
  if (bytesRead != fileSize) {
×
1063
    dError("failed to read key verification file, read=%" PRId64 ", expected=%" PRId64, bytesRead, fileSize);
×
1064
    taosMemoryFree(buffer);
×
1065
    return TSDB_CODE_FILE_CORRUPTED;
×
1066
  }
1067

1068
  int32_t     code = 0;
×
1069
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
×
1070
  int32_t     plaintextLen = strlen(plaintext);
×
1071
  const char *ptr = buffer;
×
1072

1073
  // Parse and verify header
1074
  uint32_t magic = 0;
×
1075
  uint16_t version = 0;
×
1076
  memcpy(&magic, ptr, sizeof(magic));
×
1077
  ptr += sizeof(magic);
×
1078
  memcpy(&version, ptr, sizeof(version));
×
1079
  ptr += sizeof(version);
×
1080

1081
  if (magic != 0x544B5659) {
×
1082
    dError("invalid magic number in key verification file: 0x%x", magic);
×
1083
    taosMemoryFree(buffer);
×
1084
    return TSDB_CODE_FILE_CORRUPTED;
×
1085
  }
1086

1087
  // Array of keys and their algorithms
1088
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
×
1089
  int32_t     expectedAlgos[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
×
1090
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
×
1091

1092
  // Verify each key from buffer
1093
  for (int i = 0; i < 5; i++) {
×
1094
    // Check if we have enough data remaining
1095
    if (ptr - buffer + sizeof(int32_t) * 2 > fileSize) {
×
1096
      dError("unexpected end of file while reading %s metadata", keyNames[i]);
×
1097
      taosMemoryFree(buffer);
×
1098
      return TSDB_CODE_FILE_CORRUPTED;
×
1099
    }
1100

1101
    int32_t savedAlgo = 0;
×
1102
    int32_t encryptedLen = 0;
×
1103

1104
    memcpy(&savedAlgo, ptr, sizeof(int32_t));
×
1105
    ptr += sizeof(int32_t);
×
1106
    memcpy(&encryptedLen, ptr, sizeof(int32_t));
×
1107
    ptr += sizeof(int32_t);
×
1108

1109
    if (encryptedLen <= 0 || encryptedLen > 512) {
×
1110
      dError("invalid encrypted length %d for %s", encryptedLen, keyNames[i]);
×
1111
      taosMemoryFree(buffer);
×
1112
      return TSDB_CODE_FILE_CORRUPTED;
×
1113
    }
1114

1115
    if (ptr - buffer + encryptedLen > fileSize) {
×
1116
      dError("unexpected end of file while reading %s encrypted data", keyNames[i]);
×
1117
      taosMemoryFree(buffer);
×
1118
      return TSDB_CODE_FILE_CORRUPTED;
×
1119
    }
1120

1121
    uint8_t encrypted[512] = {0};
×
1122
    memcpy(encrypted, ptr, encryptedLen);
×
1123
    ptr += encryptedLen;
×
1124

1125
    // Decrypt with current key using CBC
1126
    char decrypted[512] = {0};
×
1127

1128
    SCryptOpts opts = {0};
×
1129
    opts.len = encryptedLen;
×
1130
    opts.source = (char *)encrypted;
×
1131
    opts.result = decrypted;
×
1132
    opts.unitLen = 16;
×
1133
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
×
1134
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
×
1135

1136
    int32_t count = CBC_Decrypt(&opts);
×
1137
    if (count != opts.len) {
×
1138
      code = terrno ? terrno : TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1139
      dError("failed to decrypt verification for %s, count=%d, expected=%d, since %s - KEY IS INCORRECT", keyNames[i],
×
1140
             count, opts.len, tstrerror(code));
1141
      taosMemoryFree(buffer);
×
1142
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1143
    }
1144

1145
    // Verify decrypted data matches original plaintext (compare only the plaintext length)
1146
    if (memcmp(decrypted, plaintext, plaintextLen) != 0) {
×
1147
      dError("%s verification FAILED: decrypted text does not match - KEY IS INCORRECT", keyNames[i]);
×
1148
      taosMemoryFree(buffer);
×
1149
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1150
    }
1151

1152
    dInfo("%s verification passed (algorithm=%d)", keyNames[i], savedAlgo);
×
1153
  }
1154

1155
  taosMemoryFree(buffer);
×
1156
  dInfo("all encryption keys verified successfully");
×
1157
  return 0;
×
1158
}
1159

1160
// Public API: Verify and initialize encryption keys at startup
1161
int32_t dmVerifyAndInitEncryptionKeys(void) {
650,201✔
1162
  // Skip verification in dump sdb mode (taosd -s)
1163
  if (tsSkipKeyCheckMode) {
650,201✔
1164
    dInfo("skip encryption key verification in some special check mode");
1,568✔
1165
    return 0;
1,568✔
1166
  }
1167

1168
  // Check if encryption keys are loaded
1169
  if (tsEncryptKeysStatus != TSDB_ENCRYPT_KEY_STAT_LOADED) {
648,633✔
1170
    dDebug("encryption keys not loaded, skipping verification");
646,686✔
1171
    return 0;
646,686✔
1172
  }
1173

1174
  // Get key file paths
1175
  char    masterKeyFile[PATH_MAX] = {0};
1,947✔
1176
  char    derivedKeyFile[PATH_MAX] = {0};
1,947✔
1177
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
1,947✔
1178
                            TD_DIRSEP, TD_DIRSEP);
1179
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
1,947✔
1180
    dError("failed to build master key file path");
×
1181
    return TSDB_CODE_OUT_OF_BUFFER;
×
1182
  }
1183

1184
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
1,947✔
1185
                    TD_DIRSEP, TD_DIRSEP);
1186
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
1,947✔
1187
    dError("failed to build derived key file path");
×
1188
    return TSDB_CODE_OUT_OF_BUFFER;
×
1189
  }
1190

1191
  // Load encryption keys
1192
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
1,947✔
1193
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
1,947✔
1194
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
1,947✔
1195
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
1,947✔
1196
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
1,947✔
1197
  int32_t algorithm = 0;
1,947✔
1198
  int32_t cfgAlgorithm = 0;
1,947✔
1199
  int32_t metaAlgorithm = 0;
1,947✔
1200
  int32_t fileVersion = 0;
1,947✔
1201
  int32_t keyVersion = 0;
1,947✔
1202
  int64_t createTime = 0;
1,947✔
1203
  int64_t svrKeyUpdateTime = 0;
1,947✔
1204
  int64_t dbKeyUpdateTime = 0;
1,947✔
1205

1206
  int32_t code = taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey,
1,947✔
1207
                                      &algorithm, &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime,
1208
                                      &svrKeyUpdateTime, &dbKeyUpdateTime);
1209
  if (code != 0) {
1,947✔
1210
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1211
    return code;
×
1212
  }
1213

1214
  // Verify all keys
1215
  code = dmVerifyEncryptionKeys(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
1,947✔
1216
  if (code != 0) {
1,947✔
1217
    dError("encryption key verification failed, since %s", tstrerror(code));
×
1218
    return code;
×
1219
  }
1220

1221
  dInfo("encryption keys verified and initialized successfully");
1,947✔
1222
  return 0;
1,947✔
1223
}
1224
#else
1225
int32_t dmVerifyAndInitEncryptionKeys(void) {
1226
  // Community edition or no TaosK support
1227
  return 0;
1228
}
1229
#endif
1230

1231
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1232
static int32_t dmUpdateSvrKey(const char *newKey) {
134✔
1233
  if (newKey == NULL || newKey[0] == '\0') {
134✔
1234
    dError("invalid new SVR_KEY, key is empty");
×
1235
    return TSDB_CODE_INVALID_PARA;
×
1236
  }
1237

1238
  char masterKeyFile[PATH_MAX] = {0};
134✔
1239
  char derivedKeyFile[PATH_MAX] = {0};
134✔
1240

1241
  // Build path to key files
1242
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
134✔
1243
                            TD_DIRSEP, TD_DIRSEP);
1244
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
134✔
1245
    dError("failed to build master key file path");
×
1246
    return TSDB_CODE_OUT_OF_BUFFER;
×
1247
  }
1248

1249
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
134✔
1250
                    TD_DIRSEP, TD_DIRSEP);
1251
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
134✔
1252
    dError("failed to build derived key file path");
×
1253
    return TSDB_CODE_OUT_OF_BUFFER;
×
1254
  }
1255

1256
  // Load current keys
1257
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1258
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1259
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1260
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1261
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1262
  int32_t algorithm = 0;
134✔
1263
  int32_t cfgAlgorithm = 0;
134✔
1264
  int32_t metaAlgorithm = 0;
134✔
1265
  int32_t fileVersion = 0;
134✔
1266
  int32_t keyVersion = 0;
134✔
1267
  int64_t createTime = 0;
134✔
1268
  int64_t svrKeyUpdateTime = 0;
134✔
1269
  int64_t dbKeyUpdateTime = 0;
134✔
1270

1271
  int32_t code =
1272
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
134✔
1273
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1274
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1275
  if (code != 0) {
134✔
1276
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1277
    return code;
×
1278
  }
1279

1280
  // Update SVR_KEY
1281
  int64_t now = taosGetTimestampMs();
134✔
1282
  int32_t newKeyVersion = keyVersion + 1;
134✔
1283

1284
  dInfo("updating SVR_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
134✔
1285
  tstrncpy(svrKey, newKey, sizeof(svrKey));
134✔
1286
  svrKeyUpdateTime = now;
134✔
1287

1288
  // Save updated keys (use algorithm for all keys for backward compatibility)
1289
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
134✔
1290
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1291
  if (code != 0) {
134✔
1292
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1293
    return code;
×
1294
  }
1295

1296
  // Update key verification file with new SVR_KEY
1297
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
134✔
1298
  if (code != 0) {
134✔
1299
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1300
    // Don't fail the operation if verification file update fails
1301
  }
1302

1303
  // Update global variables
1304
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
134✔
1305
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
134✔
1306
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
134✔
1307
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
134✔
1308
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
134✔
1309
  tsEncryptAlgorithmType = algorithm;
134✔
1310
  tsEncryptFileVersion = fileVersion;
134✔
1311
  tsEncryptKeyVersion = newKeyVersion;
134✔
1312
  tsEncryptKeyCreateTime = createTime;
134✔
1313
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
134✔
1314
  tsDbKeyUpdateTime = dbKeyUpdateTime;
134✔
1315

1316
  // Update encryption key status for backward compatibility
1317
  int keyLen = strlen(tsDataKey);
134✔
1318
  if (keyLen > ENCRYPT_KEY_LEN) {
134✔
1319
    keyLen = ENCRYPT_KEY_LEN;
×
1320
  }
1321
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
134✔
1322
  memcpy(tsEncryptKey, tsDataKey, keyLen);
134✔
1323
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
134✔
1324
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
134✔
1325

1326
  dInfo("successfully updated SVR_KEY to version:%d", newKeyVersion);
134✔
1327
  return 0;
134✔
1328
}
1329

1330
static int32_t dmUpdateKeyExpiration(int32_t days, const char *strategy) {
×
1331
  if (days < 0) {
×
1332
    dError("invalid days value:%d, must be >= 0", days);
×
1333
    return TSDB_CODE_INVALID_PARA;
×
1334
  }
1335

1336
  if (strategy == NULL || strategy[0] == '\0') {
×
1337
    dError("invalid strategy, strategy is empty");
×
1338
    return TSDB_CODE_INVALID_PARA;
×
1339
  }
1340

1341
  // Validate strategy value
1342
  if (strcmp(strategy, "ALARM") != 0) {
×
1343
    dWarn("unknown strategy:%s, supported values: ALARM. Will use it anyway.", strategy);
×
1344
  }
1345

1346
  // Update global variables directly
1347
  tsKeyExpirationDays = days;
×
1348
  tstrncpy(tsKeyExpirationStrategy, strategy, sizeof(tsKeyExpirationStrategy));
×
1349

1350
  dInfo("successfully updated key expiration config: days=%d, strategy=%s", days, strategy);
×
1351
  return 0;
×
1352
}
1353

1354
static int32_t dmUpdateDbKey(const char *newKey) {
134✔
1355
  if (newKey == NULL || newKey[0] == '\0') {
134✔
1356
    dError("invalid new DB_KEY, key is empty");
×
1357
    return TSDB_CODE_INVALID_PARA;
×
1358
  }
1359

1360
  char masterKeyFile[PATH_MAX] = {0};
134✔
1361
  char derivedKeyFile[PATH_MAX] = {0};
134✔
1362

1363
  // Build path to key files
1364
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
134✔
1365
                            TD_DIRSEP, TD_DIRSEP);
1366
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
134✔
1367
    dError("failed to build master key file path");
×
1368
    return TSDB_CODE_OUT_OF_BUFFER;
×
1369
  }
1370

1371
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
134✔
1372
                    TD_DIRSEP, TD_DIRSEP);
1373
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
134✔
1374
    dError("failed to build derived key file path");
×
1375
    return TSDB_CODE_OUT_OF_BUFFER;
×
1376
  }
1377

1378
  // Load current keys
1379
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1380
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1381
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1382
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1383
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
134✔
1384
  int32_t algorithm = 0;
134✔
1385
  int32_t cfgAlgorithm = 0;
134✔
1386
  int32_t metaAlgorithm = 0;
134✔
1387
  int32_t fileVersion = 0;
134✔
1388
  int32_t keyVersion = 0;
134✔
1389
  int64_t createTime = 0;
134✔
1390
  int64_t svrKeyUpdateTime = 0;
134✔
1391
  int64_t dbKeyUpdateTime = 0;
134✔
1392

1393
  int32_t code =
1394
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
134✔
1395
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1396
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1397
  if (code != 0) {
134✔
1398
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1399
    return code;
×
1400
  }
1401

1402
  // Update DB_KEY
1403
  int64_t now = taosGetTimestampMs();
134✔
1404
  int32_t newKeyVersion = keyVersion + 1;
134✔
1405

1406
  dInfo("updating DB_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
134✔
1407
  tstrncpy(dbKey, newKey, sizeof(dbKey));
134✔
1408
  dbKeyUpdateTime = now;
134✔
1409

1410
  // Save updated keys (use algorithm for all keys for backward compatibility)
1411
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
134✔
1412
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1413
  if (code != 0) {
134✔
1414
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1415
    return code;
×
1416
  }
1417

1418
  // Update key verification file with new DB_KEY
1419
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
134✔
1420
  if (code != 0) {
134✔
1421
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1422
    // Don't fail the operation if verification file update fails
1423
  }
1424

1425
  // Update global variables
1426
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
134✔
1427
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
134✔
1428
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
134✔
1429
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
134✔
1430
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
134✔
1431
  tsEncryptAlgorithmType = algorithm;
134✔
1432
  tsEncryptFileVersion = fileVersion;
134✔
1433
  tsEncryptKeyVersion = newKeyVersion;
134✔
1434
  tsEncryptKeyCreateTime = createTime;
134✔
1435
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
134✔
1436
  tsDbKeyUpdateTime = dbKeyUpdateTime;
134✔
1437

1438
  // Update encryption key status for backward compatibility
1439
  int keyLen = strlen(tsDataKey);
134✔
1440
  if (keyLen > ENCRYPT_KEY_LEN) {
134✔
1441
    keyLen = ENCRYPT_KEY_LEN;
×
1442
  }
1443
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
134✔
1444
  memcpy(tsEncryptKey, tsDataKey, keyLen);
134✔
1445
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
134✔
1446
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
134✔
1447

1448
  dInfo("successfully updated DB_KEY to version:%d", newKeyVersion);
134✔
1449
  return 0;
134✔
1450
}
1451
#endif
1452

1453
int32_t dmProcessAlterEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
268✔
1454
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1455
  int32_t              code = 0;
268✔
1456
  SMAlterEncryptKeyReq alterKeyReq = {0};
268✔
1457
  if (tDeserializeSMAlterEncryptKeyReq(pMsg->pCont, pMsg->contLen, &alterKeyReq) != 0) {
268✔
1458
    code = TSDB_CODE_INVALID_MSG;
×
1459
    dError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
1460
    goto _exit;
×
1461
  }
1462

1463
  dInfo("received alter encrypt key req, keyType:%d", alterKeyReq.keyType);
268✔
1464

1465
  // Update the specified key (svr_key or db_key)
1466
  if (alterKeyReq.keyType == 0) {
268✔
1467
    // Update SVR_KEY
1468
    code = dmUpdateSvrKey(alterKeyReq.newKey);
134✔
1469
    if (code == 0) {
134✔
1470
      dInfo("successfully updated SVR_KEY");
134✔
1471
    } else {
1472
      dError("failed to update SVR_KEY, since %s", tstrerror(code));
×
1473
    }
1474
  } else if (alterKeyReq.keyType == 1) {
134✔
1475
    // Update DB_KEY
1476
    code = dmUpdateDbKey(alterKeyReq.newKey);
134✔
1477
    if (code == 0) {
134✔
1478
      dInfo("successfully updated DB_KEY");
134✔
1479
    } else {
1480
      dError("failed to update DB_KEY, since %s", tstrerror(code));
×
1481
    }
1482
  } else {
1483
    dError("invalid keyType:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", alterKeyReq.keyType);
×
1484
    code = TSDB_CODE_INVALID_PARA;
×
1485
  }
1486

1487
_exit:
268✔
1488
  tFreeSMAlterEncryptKeyReq(&alterKeyReq);
268✔
1489
  pMsg->code = code;
268✔
1490
  pMsg->info.rsp = NULL;
268✔
1491
  pMsg->info.rspLen = 0;
268✔
1492
  return code;
268✔
1493
#else
1494
  dError("encryption key management is only available in enterprise edition");
1495
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1496
  pMsg->info.rsp = NULL;
1497
  pMsg->info.rspLen = 0;
1498
  return TSDB_CODE_OPS_NOT_SUPPORT;
1499
#endif
1500
}
1501

1502
int32_t dmProcessAlterKeyExpirationReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1503
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1504
  int32_t                 code = 0;
×
1505
  SMAlterKeyExpirationReq alterReq = {0};
×
1506
  if (tDeserializeSMAlterKeyExpirationReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
×
1507
    code = TSDB_CODE_INVALID_MSG;
×
1508
    dError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
1509
    goto _exit;
×
1510
  }
1511

1512
  dInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
1513

1514
  // Update key expiration configuration
1515
  code = dmUpdateKeyExpiration(alterReq.days, alterReq.strategy);
×
1516
  if (code == 0) {
×
1517
    dInfo("successfully updated key expiration: %d days, strategy: %s", alterReq.days, alterReq.strategy);
×
1518
  } else {
1519
    dError("failed to update key expiration, since %s", tstrerror(code));
×
1520
  }
1521

1522
_exit:
×
1523
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
1524
  pMsg->code = code;
×
1525
  pMsg->info.rsp = NULL;
×
1526
  pMsg->info.rspLen = 0;
×
1527
  return code;
×
1528
#else
1529
  dError("key expiration management is only available in enterprise edition");
1530
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1531
  pMsg->info.rsp = NULL;
1532
  pMsg->info.rspLen = 0;
1533
  return TSDB_CODE_OPS_NOT_SUPPORT;
1534
#endif
1535
}
1536

1537
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1538
  int32_t code = 0;
×
1539
  int32_t lino = 0;
×
1540
  SMsgCb *msgCb = &pMgmt->msgCb;
×
1541
  void *pTransCli = msgCb->clientRpc;
×
1542
  void *pTransStatus = msgCb->statusRpc;  
×
1543
  void *pTransSync = msgCb->syncRpc; 
×
1544
  void *pTransServer = msgCb->serverRpc;
×
1545

1546
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
1547
  if (code != 0) {
×
1548
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
1549
    goto _error;
×
1550
  }
1551

1552
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
1553
  if (code != 0) {
×
1554
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
1555
    goto _error;
×
1556
  }
1557

1558
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
1559
  if (code != 0) {
×
1560
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
1561
    goto _error;
×
1562
  }
1563

1564
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
1565
  if (code != 0) {
×
1566
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
1567
    goto _error;
×
1568
  }
1569

1570
_error:
×
1571
  
1572
  return code;
×
1573
}
1574

1575
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
170✔
1576
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
170✔
1577
  pStatus->details[0] = 0;
170✔
1578

1579
  SMonMloadInfo minfo = {0};
170✔
1580
  (*pMgmt->getMnodeLoadsFp)(&minfo);
170✔
1581
  if (minfo.isMnode &&
170✔
1582
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
170✔
1583
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1584
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
1585
    return;
×
1586
  }
1587

1588
  SMonVloadInfo vinfo = {0};
170✔
1589
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
170✔
1590
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
510✔
1591
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
340✔
1592
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
340✔
1593
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1594
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
1595
               syncStr(pLoad->syncState));
×
1596
      break;
×
1597
    }
1598
  }
1599

1600
  taosArrayDestroy(vinfo.pVloads);
170✔
1601
}
1602

1603
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
170✔
1604
  int32_t code = 0;
170✔
1605
  dDebug("server run status req is received");
170✔
1606
  SServerStatusRsp statusRsp = {0};
170✔
1607
  dmGetServerRunStatus(pMgmt, &statusRsp);
170✔
1608

1609
  pMsg->info.rsp = NULL;
170✔
1610
  pMsg->info.rspLen = 0;
170✔
1611

1612
  SRpcMsg rspMsg = {.info = pMsg->info};
170✔
1613
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
170✔
1614
  if (rspLen < 0) {
170✔
1615
    return TSDB_CODE_OUT_OF_MEMORY;
×
1616
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1617
    // return rspMsg.code;
1618
  }
1619

1620
  void *pRsp = rpcMallocCont(rspLen);
170✔
1621
  if (pRsp == NULL) {
170✔
1622
    return terrno;
×
1623
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1624
    // return rspMsg.code;
1625
  }
1626

1627
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
170✔
1628
  if (rspLen < 0) {
170✔
1629
    return TSDB_CODE_INVALID_MSG;
×
1630
  }
1631

1632
  pMsg->info.rsp = pRsp;
170✔
1633
  pMsg->info.rspLen = rspLen;
170✔
1634
  return 0;
170✔
1635
}
1636

1637
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
24,264✔
1638
  int32_t code = 0;
24,264✔
1639

1640
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
24,264✔
1641
  if (pBlock == NULL) {
24,264✔
1642
    return terrno;
×
1643
  }
1644

1645
  size_t size = 0;
24,264✔
1646

1647
  const SSysTableMeta *pMeta = NULL;
24,264✔
1648
  getInfosDbMeta(&pMeta, &size);
24,264✔
1649

1650
  int32_t index = 0;
24,264✔
1651
  for (int32_t i = 0; i < size; ++i) {
509,544✔
1652
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
509,544✔
1653
      index = i;
24,264✔
1654
      break;
24,264✔
1655
    }
1656
  }
1657

1658
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
24,264✔
1659
  if (pBlock->pDataBlock == NULL) {
24,264✔
1660
    code = terrno;
×
1661
    goto _exit;
×
1662
  }
1663

1664
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
169,848✔
1665
    SColumnInfoData colInfoData = {0};
145,584✔
1666
    colInfoData.info.colId = i + 1;
145,584✔
1667
    colInfoData.info.type = pMeta[index].schema[i].type;
145,584✔
1668
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
145,584✔
1669
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
291,168✔
1670
      code = terrno;
×
1671
      goto _exit;
×
1672
    }
1673
  }
1674

1675
  pBlock->info.hasVarCol = true;
24,264✔
1676
_exit:
24,264✔
1677
  if (code != 0) {
24,264✔
1678
    blockDataDestroy(pBlock);
×
1679
  } else {
1680
    *ppBlock = pBlock;
24,264✔
1681
  }
1682
  return code;
24,264✔
1683
}
1684

1685
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
24,264✔
1686
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
24,264✔
1687
  if (code != 0) {
24,264✔
1688
    return code;
×
1689
  }
1690

1691
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
24,264✔
1692
  if (pColInfo == NULL) {
24,264✔
1693
    return TSDB_CODE_OUT_OF_RANGE;
×
1694
  }
1695

1696
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
24,264✔
1697
}
1698

1699
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
24,264✔
1700
  int32_t           size = 0;
24,264✔
1701
  int32_t           rowsRead = 0;
24,264✔
1702
  int32_t           code = 0;
24,264✔
1703
  SRetrieveTableReq retrieveReq = {0};
24,264✔
1704
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
24,264✔
1705
    return TSDB_CODE_INVALID_MSG;
×
1706
  }
1707
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
24,264✔
1708
#if 0
1709
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
1710
    code = TSDB_CODE_MND_NO_RIGHTS;
1711
    return code;
1712
  }
1713
#endif
1714
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
24,264✔
1715
    return TSDB_CODE_INVALID_MSG;
×
1716
  }
1717

1718
  SSDataBlock *pBlock = NULL;
24,264✔
1719
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
24,264✔
1720
    return code;
×
1721
  }
1722

1723
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
24,264✔
1724
  if (code != 0) {
24,264✔
1725
    blockDataDestroy(pBlock);
×
1726
    return code;
×
1727
  }
1728

1729
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
24,264✔
1730
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
24,264✔
1731
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
24,264✔
1732

1733
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
24,264✔
1734
  if (pRsp == NULL) {
24,264✔
1735
    code = terrno;
×
1736
    dError("failed to retrieve data since %s", tstrerror(code));
×
1737
    blockDataDestroy(pBlock);
×
1738
    return code;
×
1739
  }
1740

1741
  char *pStart = pRsp->data;
24,264✔
1742
  *(int32_t *)pStart = htonl(numOfCols);
24,264✔
1743
  pStart += sizeof(int32_t);  // number of columns
24,264✔
1744

1745
  for (int32_t i = 0; i < numOfCols; ++i) {
169,848✔
1746
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
145,584✔
1747
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
145,584✔
1748

1749
    pSchema->bytes = htonl(pColInfo->info.bytes);
145,584✔
1750
    pSchema->colId = htons(pColInfo->info.colId);
145,584✔
1751
    pSchema->type = pColInfo->info.type;
145,584✔
1752

1753
    pStart += sizeof(SSysTableSchema);
145,584✔
1754
  }
1755

1756
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
24,264✔
1757
  if (len < 0) {
24,264✔
1758
    dError("failed to retrieve data since %s", tstrerror(code));
×
1759
    blockDataDestroy(pBlock);
×
1760
    rpcFreeCont(pRsp);
×
1761
    return terrno;
×
1762
  }
1763

1764
  pRsp->numOfRows = htonl(pBlock->info.rows);
24,264✔
1765
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
24,264✔
1766
  pRsp->completed = 1;
24,264✔
1767
  pMsg->info.rsp = pRsp;
24,264✔
1768
  pMsg->info.rspLen = size;
24,264✔
1769
  dDebug("dnode variables retrieve completed");
24,264✔
1770

1771
  blockDataDestroy(pBlock);
24,264✔
1772
  return TSDB_CODE_SUCCESS;
24,264✔
1773
}
1774

1775
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
18,123,202✔
1776
  SMStreamHbRspMsg rsp = {0};
18,123,202✔
1777
  int32_t          code = 0;
18,123,202✔
1778
  SDecoder         decoder;
18,119,319✔
1779
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
18,123,202✔
1780
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
18,123,202✔
1781
  int64_t          currTs = taosGetTimestampMs();
18,123,202✔
1782

1783
  if (pMsg->code) {
18,123,202✔
1784
    return streamHbHandleRspErr(pMsg->code, currTs);
179,847✔
1785
  }
1786

1787
  tDecoderInit(&decoder, (uint8_t*)msg, len);
17,943,355✔
1788
  code = tDecodeStreamHbRsp(&decoder, &rsp);
17,943,355✔
1789
  if (code < 0) {
17,943,355✔
1790
    code = TSDB_CODE_INVALID_MSG;
×
1791
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
1792
    tDecoderClear(&decoder);
×
1793
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
1794
    return streamHbHandleRspErr(code, currTs);
×
1795
  }
1796

1797
  tDecoderClear(&decoder);
17,943,355✔
1798

1799
  return streamHbProcessRspMsg(&rsp);
17,943,355✔
1800
}
1801

1802

1803
SArray *dmGetMsgHandles() {
650,296✔
1804
  int32_t code = -1;
650,296✔
1805
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
650,296✔
1806
  if (pArray == NULL) {
650,296✔
1807
    return NULL;
×
1808
  }
1809

1810
  // Requests handled by DNODE
1811
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1812
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1813
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1814
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1815
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1816
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1817
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1818
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1819
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1820
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1821
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1822
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1823
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1824
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1825
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1826
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_KEY_EXPIRATION, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1827
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1828
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1829

1830
  // Requests handled by MNODE
1831
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1832
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1833
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
650,296✔
1834

1835
  code = 0;
650,296✔
1836

1837
_OVER:
650,296✔
1838
  if (code != 0) {
650,296✔
1839
    taosArrayDestroy(pArray);
×
1840
    return NULL;
×
1841
  } else {
1842
    return pArray;
650,296✔
1843
  }
1844
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc