• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5043

29 Apr 2026 11:44AM UTC coverage: 73.107% (-0.06%) from 73.17%
#5043

push

travis-ci

web-flow
feat(statewindow): support multi columns (#35136)

1563 of 1828 new or added lines in 18 files covered. (85.5%)

7490 existing lines in 148 files now uncovered.

277321 of 379338 relevant lines covered (73.11%)

131116908.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.01
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "crypt.h"
21
#include "monitor.h"
22
#include "stream.h"
23
#include "systable.h"
24
#include "tanalytics.h"
25
#include "tchecksum.h"
26
#include "tencrypt.h"
27
#include "tutil.h"
28

29
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
30
#include "taoskInt.h"
31
#endif
32

33
extern SConfig *tsCfg;
34
extern void     setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId);
35

36
#ifndef TD_ENTERPRISE
37
void setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId) {}
38
#endif
39

40
extern void getAuditDbNameToken(char *pDb, char *pToken);
41

42
#ifndef TD_ENTERPRISE
43
void getAuditDbNameToken(char *pDb, char *pToken) {}
44
#endif
45

46
extern void getAuditEpSet(SEpSet *ep, int32_t *pVgId);
47

48
#ifndef TD_ENTERPRISE
49
void getAuditEpSet(SEpSet *ep, int32_t *pVgId) {}
50
#endif
51

52
SMonVloadInfo tsVinfo = {0};
53
SMnodeLoad    tsMLoad = {0};
54
SDnodeData    tsDnodeData = {0};
55

56
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
2,113,831✔
57
  int32_t code = 0;
2,113,831✔
58
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
2,113,831✔
59
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
520,974✔
60
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
520,974✔
61
    pMgmt->pData->dnodeId = pCfg->dnodeId;
520,974✔
62
    pMgmt->pData->clusterId = pCfg->clusterId;
520,974✔
63
    monSetDnodeId(pCfg->dnodeId);
520,974✔
64
    auditSetDnodeId(pCfg->dnodeId);
520,974✔
65
    code = dmWriteEps(pMgmt->pData);
520,974✔
66
    if (code != 0) {
520,974✔
67
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
1,524✔
68
            tstrerror(code));
69
    }
70
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
520,974✔
71
  }
72
}
2,113,831✔
73

74
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,296,190✔
75
  int32_t code = 0;
2,296,190✔
76
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,296,190✔
77
  if (pMgmt->pData->ipWhiteVer == ver) {
2,296,190✔
78
    if (ver == 0) {
2,295,721✔
79
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,294,963✔
80
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,294,963✔
81
        dError("failed to disable ip white list on dnode");
×
82
      }
83
    }
84
    return;
2,295,721✔
85
  }
86
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
469✔
87

88
  SRetrieveWhiteListReq req = {.ver = oldVer};
469✔
89
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
469✔
90
  if (contLen < 0) {
469✔
91
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
92
    return;
×
93
  }
94
  void *pHead = rpcMallocCont(contLen);
469✔
95
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
469✔
96
  if (contLen < 0) {
469✔
97
    rpcFreeCont(pHead);
×
98
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
99
    return;
×
100
  }
101

102
  SRpcMsg rpcMsg = {.pCont = pHead,
469✔
103
                    .contLen = contLen,
104
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
105
                    .info.ahandle = 0,
106
                    .info.notFreeAhandle = 1,
107
                    .info.refId = 0,
108
                    .info.noResp = 0,
109
                    .info.handle = 0};
110
  SEpSet  epset = {0};
469✔
111

112
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
469✔
113

114
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
469✔
115
  if (code != 0) {
469✔
116
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
117
  }
118
}
119

120

121

122
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,296,190✔
123
  int32_t code = 0;
2,296,190✔
124
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,296,190✔
125
  if (pMgmt->pData->timeWhiteVer == ver) {
2,296,190✔
126
    if (ver == 0) {
2,295,721✔
127
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,294,963✔
128
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,294,963✔
129
        dError("failed to disable time white list on dnode");
×
130
      }
131
    }
132
    return;
2,295,721✔
133
  }
134
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
469✔
135

136
  SRetrieveWhiteListReq req = {.ver = oldVer};
469✔
137
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
469✔
138
  if (contLen < 0) {
469✔
139
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
140
    return;
×
141
  }
142
  void *pHead = rpcMallocCont(contLen);
469✔
143
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
469✔
144
  if (contLen < 0) {
469✔
145
    rpcFreeCont(pHead);
×
146
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
147
    return;
×
148
  }
149

150
  SRpcMsg rpcMsg = {.pCont = pHead,
469✔
151
                    .contLen = contLen,
152
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
153
                    .info.ahandle = 0,
154
                    .info.notFreeAhandle = 1,
155
                    .info.refId = 0,
156
                    .info.noResp = 0,
157
                    .info.handle = 0};
158
  SEpSet  epset = {0};
469✔
159

160
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
469✔
161

162
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
469✔
163
  if (code != 0) {
469✔
164
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
165
  }
166
}
167

168

169

170
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,296,190✔
171
  int32_t code = 0;
2,296,190✔
172
  int64_t oldVer = taosAnalyGetVersion();
2,296,190✔
173
  if (oldVer == newVer) return;
2,296,190✔
174
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
175

176
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
177
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
178
  if (contLen < 0) {
×
179
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
180
    return;
×
181
  }
182

183
  void *pHead = rpcMallocCont(contLen);
×
184
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
185
  if (contLen < 0) {
×
186
    rpcFreeCont(pHead);
×
187
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
188
    return;
×
189
  }
190

191
  SRpcMsg rpcMsg = {
×
192
      .pCont = pHead,
193
      .contLen = contLen,
194
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
195
      .info.ahandle = 0,
196
      .info.refId = 0,
197
      .info.noResp = 0,
198
      .info.handle = 0,
199
  };
200
  SEpSet epset = {0};
×
201

202
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
203

204
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
205
  if (code != 0) {
×
206
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
207
  }
208
}
209

210
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
56,365,588✔
211
  const STraceId *trace = &pRsp->info.traceId;
56,365,588✔
212
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
56,365,588✔
213

214
  if (pRsp->code != 0) {
56,365,588✔
215
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
559,508✔
216
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
8,029✔
217
             pMgmt->statusSeq);
218
      pMgmt->pData->dropped = 1;
8,029✔
219
      if (dmWriteEps(pMgmt->pData) != 0) {
8,029✔
220
        dError("failed to write dnode file");
×
221
      }
222
      dInfo("dnode will exit since it is in the dropped state");
8,029✔
223
      (void)raise(SIGINT);
8,029✔
224
    }
225
  } else {
226
    SStatusRsp statusRsp = {0};
55,806,080✔
227
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
58,102,270✔
228
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,296,190✔
229
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,296,190✔
230
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
2,113,831✔
231
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
232
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
2,113,831✔
233
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
2,113,831✔
234
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
2,113,831✔
235
      }
236
      setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken, &(statusRsp.auditEpSet), statusRsp.auditVgId);
2,296,190✔
237
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,296,190✔
238
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,296,190✔
239
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,296,190✔
240
    }
241
    tFreeSStatusRsp(&statusRsp);
55,806,080✔
242
  }
243
  rpcFreeCont(pRsp->pCont);
56,365,588✔
244
}
56,365,588✔
245

246
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
56,507,518✔
247
  int32_t    code = 0;
56,507,518✔
248
  SStatusReq req = {0};
56,507,518✔
249
  req.timestamp = taosGetTimestampMs();
56,507,518✔
250
  pMgmt->statusSeq++;
56,507,518✔
251

252
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
56,507,518✔
253
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
56,507,518✔
254
    dError("failed to lock status info lock");
×
255
    return;
196✔
256
  }
257

258
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
56,507,518✔
259
  req.sver = tsVersion;
56,507,518✔
260
  req.dnodeVer = tsDnodeData.dnodeVer;
56,507,518✔
261
  req.dnodeId = tsDnodeData.dnodeId;
56,507,518✔
262
  req.clusterId = tsDnodeData.clusterId;
56,507,518✔
263
  if (req.clusterId == 0) req.dnodeId = 0;
56,507,518✔
264
  req.rebootTime = tsDnodeData.rebootTime;
56,507,518✔
265
  req.updateTime = tsDnodeData.updateTime;
56,507,518✔
266
  req.numOfCores = tsNumOfCores;
56,507,518✔
267
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
56,507,518✔
268
  req.numOfDiskCfg = tsDiskCfgNum;
56,507,518✔
269
  req.memTotal = tsTotalMemoryKB * 1024;
56,507,518✔
270
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
56,507,518✔
271
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
56,507,518✔
272
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
56,507,518✔
273

274
  req.clusterCfg.statusInterval = tsStatusInterval;
56,507,518✔
275
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
56,507,518✔
276
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
56,507,518✔
277
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
56,507,518✔
278
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
56,507,518✔
279
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
56,507,518✔
280
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
56,507,518✔
281
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
56,507,518✔
282
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
56,507,518✔
283
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
56,507,518✔
284
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
56,507,518✔
285
  req.clusterCfg.checkTime = (int64_t)taosGetLocalTimezoneOffset(&code);
56,507,518✔
286
  if (code != 0) {
56,507,518✔
287
    dError("failed to get local timezone offset, since %s", tstrerror(code));
×
288
    (void)taosThreadMutexUnlock(&pMgmt->pData->statusInfolock);
×
289
    return;
×
290
  }
291

292
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
56,507,518✔
293
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
56,507,518✔
294
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
56,507,518✔
295
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
56,507,518✔
296

297
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
56,507,518✔
298

299
  req.pVloads = tsVinfo.pVloads;
56,507,518✔
300
  tsVinfo.pVloads = NULL;
56,507,518✔
301

302
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
56,507,518✔
303
  req.mload = tsMLoad;
56,507,518✔
304

305
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
56,507,518✔
306
    dError("failed to unlock status info lock");
×
307
    return;
×
308
  }
309

310
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
56,507,518✔
311
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
56,507,518✔
312

313
  req.statusSeq = pMgmt->statusSeq;
56,507,518✔
314
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
56,507,518✔
315
  req.analVer = taosAnalyGetVersion();
56,507,518✔
316
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
56,507,518✔
317

318
  if (tsAuditUseToken) {
56,507,518✔
319
    getAuditDbNameToken(req.auditDB, req.auditToken);
56,503,078✔
320
  }
321

322
  if (tsAuditSaveInSelf) {
56,507,518✔
323
    getAuditEpSet(&req.auditEpSet, &req.auditVgId);
2,535✔
324
  }
325

326
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
56,507,518✔
327
  if (contLen < 0) {
56,507,518✔
328
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
329
    return;
×
330
  }
331

332
  void *pHead = rpcMallocCont(contLen);
56,507,518✔
333
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
56,507,518✔
334
  if (contLen < 0) {
56,507,518✔
335
    rpcFreeCont(pHead);
×
336
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
337
    return;
×
338
  }
339
  tFreeSStatusReq(&req);
56,507,518✔
340

341
  SRpcMsg rpcMsg = {.pCont = pHead,
56,507,518✔
342
                    .contLen = contLen,
343
                    .msgType = TDMT_MND_STATUS,
344
                    .info.ahandle = 0,
345
                    .info.notFreeAhandle = 1,
346
                    .info.refId = 0,
347
                    .info.noResp = 0,
348
                    .info.handle = 0};
349
  SRpcMsg rpcRsp = {0};
56,507,518✔
350

351
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
56,507,518✔
352

353
  SEpSet epSet = {0};
56,507,518✔
354
  int8_t epUpdated = 0;
56,507,518✔
355
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
56,507,518✔
356

357
  if (dDebugFlag & DEBUG_TRACE) {
56,507,518✔
358
    char tbuf[512];
2,006,542✔
359
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
2,020,808✔
360
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
2,020,808✔
361
  }
362
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
56,507,518✔
363
  if (code != 0) {
56,507,518✔
364
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
141,930✔
365
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
141,930✔
366
      dmRotateMnodeEpSet(pMgmt->pData);
141,930✔
367
      char tbuf[512];
141,734✔
368
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
141,930✔
369
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
141,930✔
370
            tbuf, epSet.inUse);
371
    }
372
    return;
141,930✔
373
  }
374

375
  if (rpcRsp.code != 0) {
56,365,588✔
376
    dmRotateMnodeEpSet(pMgmt->pData);
559,508✔
377
    char tbuf[512];
559,508✔
378
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
559,508✔
379
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
559,508✔
380
          tbuf, epSet.inUse);
381
  } else {
382
    if (epUpdated == 1) {
55,806,080✔
383
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
74,445✔
384
    }
385
  }
386
  dmProcessStatusRsp(pMgmt, &rpcRsp);
56,365,588✔
387
}
388

389
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
682,221✔
390
  const STraceId *trace = &pRsp->info.traceId;
682,221✔
391
  int32_t         code = 0;
682,221✔
392
  SConfigRsp      configRsp = {0};
682,221✔
393
  bool            needStop = false;
682,221✔
394

395
  if (pRsp->code != 0) {
682,221✔
396
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
397
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
398
      pMgmt->pData->dropped = 1;
×
399
      if (dmWriteEps(pMgmt->pData) != 0) {
×
400
        dError("failed to write dnode file");
×
401
      }
402
      dInfo("dnode will exit since it is in the dropped state");
×
403
      (void)raise(SIGINT);
×
404
    }
405
  } else {
406
    bool needUpdate = false;
682,221✔
407
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,364,442✔
408
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
682,221✔
409
      // Try to use cfg from mnode sdb.
410
      if (!configRsp.isVersionVerified) {
682,221✔
411
        uInfo("config version not verified, update config");
534,500✔
412
        needUpdate = true;
534,500✔
413
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
534,500✔
414
        if (code != TSDB_CODE_SUCCESS) {
534,500✔
415
          dError("failed to persist global config since %s", tstrerror(code));
12,390✔
416
          goto _exit;
12,390✔
417
        }
418
      }
419
    }
420
    if (needUpdate) {
669,831✔
421
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
522,110✔
422
      if (code != TSDB_CODE_SUCCESS) {
522,110✔
423
        dError("failed to update config since %s", tstrerror(code));
×
424
        goto _exit;
×
425
      }
426
      code = setAllConfigs(tsCfg);
522,110✔
427
      if (code != TSDB_CODE_SUCCESS) {
522,110✔
428
        dError("failed to set all configs since %s", tstrerror(code));
1,584✔
429
        goto _exit;
1,584✔
430
      }
431
    }
432
    code = taosPersistLocalConfig(pMgmt->path);
668,247✔
433
    if (code != TSDB_CODE_SUCCESS) {
668,247✔
434
      dError("failed to persist local config since %s", tstrerror(code));
×
435
    }
436
    tsConfigInited = 1;
668,247✔
437
  }
438
_exit:
682,221✔
439
  tFreeSConfigRsp(&configRsp);
682,221✔
440
  rpcFreeCont(pRsp->pCont);
682,221✔
441
  if (needStop) {
682,221✔
442
    dmStop();
×
443
  }
444
}
682,221✔
445

446
int32_t dmProcessKeySyncRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
643,619✔
447
  const STraceId *trace = &pRsp->info.traceId;
643,619✔
448
  int32_t         code = 0;
643,619✔
449
  SKeySyncRsp     keySyncRsp = {0};
643,619✔
450

451
  if (pRsp->code != 0) {
643,619✔
452
    dError("failed to sync keys from mnode since %s", tstrerror(pRsp->code));
×
453
    code = pRsp->code;
×
454
    goto _exit;
×
455
  }
456

457
  if (pRsp->pCont == NULL || pRsp->contLen <= 0) {
643,619✔
458
    dError("invalid key sync response, empty content");
×
459
    code = TSDB_CODE_INVALID_MSG;
×
460
    goto _exit;
×
461
  }
462

463
  code = tDeserializeSKeySyncRsp(pRsp->pCont, pRsp->contLen, &keySyncRsp);
643,619✔
464
  if (code != 0) {
643,619✔
465
    dError("failed to deserialize key sync response since %s", tstrerror(code));
×
466
    goto _exit;
×
467
  }
468

469
  dInfo("received key sync response, mnode keyVersion:%d, local keyVersion:%d, needUpdate:%d", keySyncRsp.keyVersion,
643,619✔
470
        tsLocalKeyVersion, keySyncRsp.needUpdate);
471
  tsEncryptKeysStatus = keySyncRsp.encryptionKeyStatus;
643,619✔
472
  if (keySyncRsp.needUpdate) {
643,619✔
473
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
474
    // Get encrypt file path from tsDataDir
475
    char masterKeyFile[PATH_MAX] = {0};
1,642✔
476
    char derivedKeyFile[PATH_MAX] = {0};
1,642✔
477
    snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,642✔
478
             TD_DIRSEP);
479
    snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,642✔
480
             TD_DIRSEP);
481

482
    dInfo("updating local encryption keys from mnode, key file is saved in %s and %s, keyVersion:%d -> %d",
1,642✔
483
          masterKeyFile, derivedKeyFile, tsLocalKeyVersion, keySyncRsp.keyVersion);
484

485
    // Save keys to master.bin and derived.bin
486
    // Use the same algorithm for cfg and meta keys (backward compatible)
487
    code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, keySyncRsp.svrKey, keySyncRsp.dbKey, keySyncRsp.cfgKey, keySyncRsp.metaKey,
1,642✔
488
                                keySyncRsp.dataKey, keySyncRsp.algorithm, keySyncRsp.algorithm, keySyncRsp.algorithm,
489
                                keySyncRsp.keyVersion, keySyncRsp.createTime,
490
                                keySyncRsp.svrKeyUpdateTime, keySyncRsp.dbKeyUpdateTime);
491
    if (code != 0) {
1,642✔
492
      dError("failed to save encryption keys since %s", tstrerror(code));
×
493
      goto _exit;
×
494
    }
495

496
    // Update global variables with synced keys
497
    tstrncpy(tsSvrKey, keySyncRsp.svrKey, sizeof(tsSvrKey));
1,642✔
498
    tstrncpy(tsDbKey, keySyncRsp.dbKey, sizeof(tsDbKey));
1,642✔
499
    tstrncpy(tsCfgKey, keySyncRsp.cfgKey, sizeof(tsCfgKey));
1,642✔
500
    tstrncpy(tsMetaKey, keySyncRsp.metaKey, sizeof(tsMetaKey));
1,642✔
501
    tstrncpy(tsDataKey, keySyncRsp.dataKey, sizeof(tsDataKey));
1,642✔
502
    tsEncryptAlgorithmType = keySyncRsp.algorithm;
1,642✔
503
    tsEncryptKeyVersion = keySyncRsp.keyVersion;
1,642✔
504
    tsEncryptKeyCreateTime = keySyncRsp.createTime;
1,642✔
505
    tsSvrKeyUpdateTime = keySyncRsp.svrKeyUpdateTime;
1,642✔
506
    tsDbKeyUpdateTime = keySyncRsp.dbKeyUpdateTime;
1,642✔
507

508
    // Update local key version
509
    tsLocalKeyVersion = keySyncRsp.keyVersion;
1,642✔
510
    dInfo("successfully updated local encryption keys to version:%d", tsLocalKeyVersion);
1,642✔
511

512
    // Encrypt existing plaintext config files
513
    code = taosEncryptExistingCfgFiles(tsDataDir);
1,642✔
514
    if (code != 0) {
1,642✔
515
      dWarn("failed to encrypt existing config files since %s, will retry on next write", tstrerror(code));
×
516
      // Don't fail the key sync, files will be encrypted on next write
517
      code = 0;
×
518
    }
519
#else
520
    dWarn("enterprise features not enabled, skipping key sync");
521
#endif
522
  } else {
523
    dDebug("local keys are up to date, version:%d", tsLocalKeyVersion);
641,977✔
524
  }
525
  
526
  code = TSDB_CODE_SUCCESS;
643,619✔
527

528
_exit:
643,619✔
529
  rpcFreeCont(pRsp->pCont);
643,619✔
530
  return code;
643,619✔
531
}
532

533
void dmSendKeySyncReq(SDnodeMgmt *pMgmt) {
662,173✔
534
  int32_t     code = 0;
662,173✔
535
  SKeySyncReq req = {0};
662,173✔
536

537
  req.dnodeId = pMgmt->pData->dnodeId;
662,173✔
538
  req.keyVersion = tsLocalKeyVersion;
662,173✔
539
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d", req.dnodeId, req.keyVersion);
662,173✔
540

541
  int32_t contLen = tSerializeSKeySyncReq(NULL, 0, &req);
662,173✔
542
  if (contLen < 0) {
662,173✔
543
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
UNCOV
544
    return;
×
545
  }
546

547
  void *pHead = rpcMallocCont(contLen);
662,173✔
548
  if (pHead == NULL) {
662,173✔
549
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
550
    return;
×
551
  }
552
  contLen = tSerializeSKeySyncReq(pHead, contLen, &req);
662,173✔
553
  if (contLen < 0) {
662,173✔
554
    rpcFreeCont(pHead);
×
555
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
556
    return;
×
557
  }
558

559
  SRpcMsg rpcMsg = {.pCont = pHead,
662,173✔
560
                    .contLen = contLen,
561
                    .msgType = TDMT_MND_KEY_SYNC,
562
                    .info.ahandle = 0,
563
                    .info.notFreeAhandle = 1,
564
                    .info.refId = 0,
565
                    .info.noResp = 0,
566
                    .info.handle = 0};
567
  SRpcMsg rpcRsp = {0};
662,173✔
568

569
  SEpSet epSet = {0};
662,173✔
570
  int8_t epUpdated = 0;
662,173✔
571
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
662,173✔
572

573
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d, begin to send rpc msg", req.dnodeId, req.keyVersion);
662,173✔
574
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
662,173✔
575
  if (code != 0) {
662,173✔
576
    dError("failed to SendRecv key sync req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
18,554✔
577
    return;
18,554✔
578
  }
579
  if (rpcRsp.code != 0) {
643,619✔
580
    dError("failed to send key sync req since %s", tstrerror(rpcRsp.code));
×
581
    return;
×
582
  }
583
  code = dmProcessKeySyncRsp(pMgmt, &rpcRsp);
643,619✔
584
  if (code != 0) {
643,619✔
585
    dError("failed to process key sync rsp since %s", tstrerror(code));
×
586
    return;
×
587
  }
588
}
589

590
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
705,441✔
591
  int32_t    code = 0;
705,441✔
592
  SConfigReq req = {0};
705,441✔
593

594
  req.cver = tsdmConfigVersion;
705,441✔
595
  req.forceReadConfig = true;
705,441✔
596
  req.array = taosGetGlobalCfg(tsCfg);
705,441✔
597
  dDebug("send config req to mnode, configVersion:%d", req.cver);
705,441✔
598

599
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
705,441✔
600
  if (contLen < 0) {
705,441✔
601
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
602
    return;
294✔
603
  }
604

605
  void *pHead = rpcMallocCont(contLen);
705,441✔
606
  if (pHead == NULL) {
705,441✔
607
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
608
    return;
×
609
  }
610
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
705,441✔
611
  if (contLen < 0) {
705,441✔
612
    rpcFreeCont(pHead);
×
613
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
614
    return;
×
615
  }
616

617
  SRpcMsg rpcMsg = {.pCont = pHead,
705,441✔
618
                    .contLen = contLen,
619
                    .msgType = TDMT_MND_CONFIG,
620
                    .info.ahandle = 0,
621
                    .info.notFreeAhandle = 1,
622
                    .info.refId = 0,
623
                    .info.noResp = 0,
624
                    .info.handle = 0};
625
  SRpcMsg rpcRsp = {0};
705,441✔
626

627
  SEpSet epSet = {0};
705,441✔
628
  int8_t epUpdated = 0;
705,441✔
629
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
705,441✔
630

631
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
705,441✔
632
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
705,441✔
633
  if (code != 0) {
705,441✔
634
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
23,220✔
635
    return;
23,220✔
636
  }
637
  if (rpcRsp.code != 0) {
682,221✔
638
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
639
    return;
×
640
  }
641
  dmProcessConfigRsp(pMgmt, &rpcRsp);
682,221✔
642
}
643

644
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
57,344,636✔
645
  dDebug("begin to get dnode info");
57,344,636✔
646
  SDnodeData dnodeData = {0};
57,344,636✔
647
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
57,344,636✔
648
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
57,344,636✔
649
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
57,344,636✔
650
  dnodeData.clusterId = pMgmt->pData->clusterId;
57,344,636✔
651
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
57,344,636✔
652
  dnodeData.updateTime = pMgmt->pData->updateTime;
57,344,636✔
653
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
57,344,636✔
654
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
57,344,636✔
655

656
  dDebug("begin to get vnode loads");
57,344,636✔
657
  SMonVloadInfo vinfo = {0};
57,344,636✔
658
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
57,344,636✔
659

660
  dDebug("begin to get mnode loads");
57,344,636✔
661
  SMonMloadInfo minfo = {0};
57,344,636✔
662
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
57,344,636✔
663

664
  dDebug("begin to lock status info");
57,344,636✔
665
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
57,344,636✔
666
    dError("failed to lock status info lock");
×
667
    return;
×
668
  }
669
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
57,344,636✔
670
  tsDnodeData.dnodeId = dnodeData.dnodeId;
57,344,636✔
671
  tsDnodeData.clusterId = dnodeData.clusterId;
57,344,636✔
672
  tsDnodeData.rebootTime = dnodeData.rebootTime;
57,344,636✔
673
  tsDnodeData.updateTime = dnodeData.updateTime;
57,344,636✔
674
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
57,344,636✔
675

676
  if (tsVinfo.pVloads == NULL) {
57,344,636✔
677
    tsVinfo.pVloads = vinfo.pVloads;
55,472,826✔
678
    vinfo.pVloads = NULL;
55,472,826✔
679
  } else {
680
    taosArrayDestroy(vinfo.pVloads);
1,871,810✔
681
    vinfo.pVloads = NULL;
1,871,810✔
682
  }
683

684
  tsMLoad = minfo.load;
57,344,636✔
685

686
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
57,344,636✔
687
    dError("failed to unlock status info lock");
×
688
    return;
×
689
  }
690
}
691

692
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
693
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
694
  if (contLen < 0) {
×
695
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
696
    return;
×
697
  }
698
  void *pHead = rpcMallocCont(contLen);
×
699
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
700
  if (contLen < 0) {
×
701
    rpcFreeCont(pHead);
×
702
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
703
    return;
×
704
  }
705

706
  SRpcMsg rpcMsg = {.pCont = pHead,
×
707
                    .contLen = contLen,
708
                    .msgType = TDMT_MND_NOTIFY,
709
                    .info.ahandle = 0,
710
                    .info.notFreeAhandle = 1,
711
                    .info.refId = 0,
712
                    .info.noResp = 1,
713
                    .info.handle = 0};
714

715
  SEpSet epSet = {0};
×
716
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
717
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
718
    dError("failed to send notify req");
×
719
  }
720
}
721

722
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
723
  dError("auth rsp is received, but not supported yet");
×
724
  return 0;
×
725
}
726

727
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
728
  dError("grant rsp is received, but not supported yet");
×
729
  return 0;
×
730
}
731

732
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
96,754✔
733
  int32_t       code = 0;
96,754✔
734
  SDCfgDnodeReq cfgReq = {0};
96,754✔
735
  SConfig      *pCfg = taosGetCfg();
96,754✔
736
  SConfigItem  *pItem = NULL;
96,754✔
737

738
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
96,754✔
739
    return TSDB_CODE_INVALID_MSG;
×
740
  }
741
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
96,754✔
742
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
3,673✔
743
  }
744

745
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
93,081✔
746

747
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
93,081✔
748
  if (code != 0) {
93,081✔
749
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
225✔
750
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
225✔
751
      return TSDB_CODE_SUCCESS;
225✔
752
    } else {
753
      return code;
×
754
    }
755
  }
756
  if (pItem == NULL) {
92,856✔
757
    return TSDB_CODE_CFG_NOT_FOUND;
×
758
  }
759

760
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
92,856✔
761
    char value[10] = {0};
×
762
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
763
      tsSyncTimeout = 0;
×
764
    }
765

766
    if (tsSyncTimeout > 0) {
×
767
      SConfigItem *pItemTmp = NULL;
×
768
      char         tmp[10] = {0};
×
769

770
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout);
×
771
      TAOS_CHECK_RETURN(
×
772
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
773
      if (pItemTmp == NULL) {
×
774
        return TSDB_CODE_CFG_NOT_FOUND;
×
775
      }
776

777
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout / 4);
×
778
      TAOS_CHECK_RETURN(
×
779
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
780
      if (pItemTmp == NULL) {
×
781
        return TSDB_CODE_CFG_NOT_FOUND;
×
782
      }
783
      TAOS_CHECK_RETURN(
×
784
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
785
      if (pItemTmp == NULL) {
×
786
        return TSDB_CODE_CFG_NOT_FOUND;
×
787
      }
788

789
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
790
      TAOS_CHECK_RETURN(
×
791
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
792
      if (pItemTmp == NULL) {
×
793
        return TSDB_CODE_CFG_NOT_FOUND;
×
794
      }
795
      TAOS_CHECK_RETURN(
×
796
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
797
      if (pItemTmp == NULL) {
×
798
        return TSDB_CODE_CFG_NOT_FOUND;
×
799
      }
800
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
801
      if (pItemTmp == NULL) {
×
802
        return TSDB_CODE_CFG_NOT_FOUND;
×
803
      }
804

805
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
806
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
807
      if (pItemTmp == NULL) {
×
808
        return TSDB_CODE_CFG_NOT_FOUND;
×
809
      }
810

811
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
812
      TAOS_CHECK_RETURN(
×
813
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
814
      if (pItemTmp == NULL) {
×
815
        return TSDB_CODE_CFG_NOT_FOUND;
×
816
      }
817
      TAOS_CHECK_RETURN(
×
818
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
819
      if (pItemTmp == NULL) {
×
820
        return TSDB_CODE_CFG_NOT_FOUND;
×
821
      }
822
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
823
      if (pItemTmp == NULL) {
×
824
        return TSDB_CODE_CFG_NOT_FOUND;
×
825
      }
826

827
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
828
            tsSyncTimeout);
829
    }
830
  }
831

832
  if (!isConifgItemLazyMode(pItem)) {
92,856✔
833
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
92,184✔
834

835
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
92,184✔
836
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
837
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
838
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
839

840
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
841
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
842
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
843
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
844

845
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
846
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
847
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
848

849
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
850
            tsSyncTimeout);
851
    }
852
  }
853

854
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
92,856✔
855
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
16,208✔
856
    if (code != TSDB_CODE_SUCCESS) {
16,208✔
857
      dError("failed to persist global config since %s", tstrerror(code));
×
858
    }
859
  } else {
860
    code = taosPersistLocalConfig(pMgmt->path);
76,648✔
861
    if (code != TSDB_CODE_SUCCESS) {
76,648✔
862
      dError("failed to persist local config since %s", tstrerror(code));
×
863
    }
864
  }
865

866
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
92,856✔
867
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
868

869
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
870
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
871
  }
872

873
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
92,856✔
874
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
92,856✔
875
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
876
  }
877

878
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
92,856✔
879
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
92,856✔
880
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
881
  }
882

883
  if (cfgReq.version > 0) {
92,856✔
884
    tsdmConfigVersion = cfgReq.version;
23,582✔
885
  }
886
  return code;
92,856✔
887
}
888

889
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
188✔
890
#ifdef TD_ENTERPRISE
891
  int32_t       code = 0;
188✔
892
  SDCfgDnodeReq cfgReq = {0};
188✔
893
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
188✔
894
    code = TSDB_CODE_INVALID_MSG;
×
895
    goto _exit;
×
896
  }
897

898
  code = dmUpdateEncryptKey(cfgReq.value, true);
188✔
899
  if (code == 0) {
188✔
900
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
188✔
901
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
188✔
902
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
188✔
903
  }
904

905
_exit:
188✔
906
  pMsg->code = code;
188✔
907
  pMsg->info.rsp = NULL;
188✔
908
  pMsg->info.rspLen = 0;
188✔
909
  return code;
188✔
910
#else
911
  return 0;
912
#endif
913
}
914

915
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
916
// Verification plaintext used to validate encryption keys
917
#define KEY_VERIFY_PLAINTEXT "TDengine_Encryption_Key_Verification_v1.0"
918

919
// Save key verification file with encrypted plaintext for each key
920
static int32_t dmSaveKeyVerification(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
2,404✔
921
                                     const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
922
                                     int32_t metaAlgorithm) {
923
  char    verifyFile[PATH_MAX] = {0};
2,404✔
924
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
2,404✔
925
                            TD_DIRSEP, TD_DIRSEP);
926
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
2,404✔
927
    dError("failed to build key verification file path");
×
928
    return TSDB_CODE_OUT_OF_BUFFER;
×
929
  }
930

931
  int32_t     code = 0;
2,404✔
932
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
2,404✔
933
  int32_t     plaintextLen = strlen(plaintext);
2,404✔
934

935
  // Array of keys and their algorithms
936
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
2,404✔
937
  int32_t     algorithms[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
2,404✔
938
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
2,404✔
939

940
  // Calculate total buffer size
941
  int32_t encryptedLen = ((plaintextLen + 15) / 16) * 16;                 // Padded length for CBC
2,404✔
942
  int32_t headerSize = sizeof(uint32_t) + sizeof(uint16_t);               // magic + version
2,404✔
943
  int32_t perKeySize = sizeof(int32_t) + sizeof(int32_t) + encryptedLen;  // algo + len + encrypted data
2,404✔
944
  int32_t totalSize = headerSize + perKeySize * 5;
2,404✔
945

946
  // Allocate buffer for all data
947
  char *buffer = taosMemoryMalloc(totalSize);
2,404✔
948
  if (buffer == NULL) {
2,404✔
949
    dError("failed to allocate memory for key verification buffer");
×
950
    return terrno;
×
951
  }
952

953
  char *ptr = buffer;
2,404✔
954

955
  // Write magic number and version to buffer
956
  uint32_t magic = 0x544B5659;  // "TKVY" in hex
2,404✔
957
  uint16_t version = 1;
2,404✔
958
  memcpy(ptr, &magic, sizeof(magic));
2,404✔
959
  ptr += sizeof(magic);
2,404✔
960
  memcpy(ptr, &version, sizeof(version));
2,404✔
961
  ptr += sizeof(version);
2,404✔
962

963
  // Encrypt all keys and write to buffer
964
  char paddedPlaintext[512] = {0};
2,404✔
965
  memcpy(paddedPlaintext, plaintext, plaintextLen);
2,404✔
966

967
  for (int i = 0; i < 5; i++) {
14,424✔
968
    char encrypted[512] = {0};
12,020✔
969

970
    // Encrypt the verification plaintext with this key using CBC
971
    SCryptOpts opts = {0};
12,020✔
972
    opts.len = encryptedLen;
12,020✔
973
    opts.source = paddedPlaintext;
12,020✔
974
    opts.result = encrypted;
12,020✔
975
    opts.unitLen = 16;
12,020✔
976
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
12,020✔
977
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
12,020✔
978

979
    int32_t count = CBC_Encrypt(&opts);
12,020✔
980
    if (count != opts.len) {
12,020✔
981
      code = terrno ? terrno : TSDB_CODE_FAILED;
×
982
      dError("failed to encrypt verification for %s, count=%d, expected=%d, since %s", keyNames[i], count, opts.len,
×
983
             tstrerror(code));
984
      taosMemoryFree(buffer);
×
985
      return code;
×
986
    }
987

988
    // Write to buffer: algorithm + encrypted length + encrypted data
989
    memcpy(ptr, &algorithms[i], sizeof(int32_t));
12,020✔
990
    ptr += sizeof(int32_t);
12,020✔
991
    memcpy(ptr, &encryptedLen, sizeof(int32_t));
12,020✔
992
    ptr += sizeof(int32_t);
12,020✔
993
    memcpy(ptr, encrypted, encryptedLen);
12,020✔
994
    ptr += encryptedLen;
12,020✔
995

996
    dDebug("prepared verification for %s: algorithm=%d, encLen=%d", keyNames[i], algorithms[i], encryptedLen);
12,020✔
997
  }
998

999
  // Write all data to file in one operation
1000
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
2,404✔
1001
  if (pFile == NULL) {
2,404✔
1002
    dError("failed to create key verification file:%s, errno:%d", verifyFile, errno);
×
1003
    taosMemoryFree(buffer);
×
1004
    return TSDB_CODE_FILE_CORRUPTED;
×
1005
  }
1006

1007
  int64_t written = taosWriteFile(pFile, buffer, totalSize);
2,404✔
1008
  (void)taosCloseFile(&pFile);
2,404✔
1009
  taosMemoryFree(buffer);
2,404✔
1010

1011
  if (written != totalSize) {
2,404✔
1012
    dError("failed to write key verification file, written=%" PRId64 ", expected=%d", written, totalSize);
×
1013
    return TSDB_CODE_FILE_CORRUPTED;
×
1014
  }
1015

1016
  dInfo("successfully saved key verification file:%s, size=%d", verifyFile, totalSize);
2,404✔
1017
  return 0;
2,404✔
1018
}
1019

1020
// Verify all encryption keys by decrypting and comparing with original plaintext
1021
static int32_t dmVerifyEncryptionKeys(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
2,104✔
1022
                                      const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
1023
                                      int32_t metaAlgorithm) {
1024
  char    verifyFile[PATH_MAX] = {0};
2,104✔
1025
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
2,104✔
1026
                            TD_DIRSEP, TD_DIRSEP);
1027
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
2,104✔
1028
    dError("failed to build key verification file path");
×
1029
    return TSDB_CODE_OUT_OF_BUFFER;
×
1030
  }
1031

1032
  // Get file size
1033
  int64_t fileSize = 0;
2,104✔
1034
  if (taosStatFile(verifyFile, &fileSize, NULL, NULL) < 0) {
2,104✔
1035
    // File doesn't exist, create it with current keys
1036
    dInfo("key verification file not found, creating new one");
2,104✔
1037
    return dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
2,104✔
1038
  }
1039

1040
  if (fileSize <= 0 || fileSize > 10240) {  // Max 10KB
×
1041
    dError("invalid key verification file size: %" PRId64, fileSize);
×
1042
    return TSDB_CODE_FILE_CORRUPTED;
×
1043
  }
1044

1045
  // Allocate buffer and read entire file
1046
  char *buffer = taosMemoryMalloc(fileSize);
×
1047
  if (buffer == NULL) {
×
1048
    dError("failed to allocate memory for key verification buffer");
×
1049
    return terrno;
×
1050
  }
1051

1052
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_READ);
×
1053
  if (pFile == NULL) {
×
1054
    dError("failed to open key verification file:%s", verifyFile);
×
1055
    taosMemoryFree(buffer);
×
1056
    return TSDB_CODE_FILE_CORRUPTED;
×
1057
  }
1058

1059
  int64_t bytesRead = taosReadFile(pFile, buffer, fileSize);
×
1060
  (void)taosCloseFile(&pFile);
×
1061

1062
  if (bytesRead != fileSize) {
×
1063
    dError("failed to read key verification file, read=%" PRId64 ", expected=%" PRId64, bytesRead, fileSize);
×
1064
    taosMemoryFree(buffer);
×
1065
    return TSDB_CODE_FILE_CORRUPTED;
×
1066
  }
1067

1068
  int32_t     code = 0;
×
1069
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
×
1070
  int32_t     plaintextLen = strlen(plaintext);
×
1071
  const char *ptr = buffer;
×
1072

1073
  // Parse and verify header
1074
  uint32_t magic = 0;
×
1075
  uint16_t version = 0;
×
1076
  memcpy(&magic, ptr, sizeof(magic));
×
1077
  ptr += sizeof(magic);
×
1078
  memcpy(&version, ptr, sizeof(version));
×
1079
  ptr += sizeof(version);
×
1080

1081
  if (magic != 0x544B5659) {
×
1082
    dError("invalid magic number in key verification file: 0x%x", magic);
×
1083
    taosMemoryFree(buffer);
×
1084
    return TSDB_CODE_FILE_CORRUPTED;
×
1085
  }
1086

1087
  // Array of keys and their algorithms
1088
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
×
1089
  int32_t     expectedAlgos[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
×
1090
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
×
1091

1092
  // Verify each key from buffer
1093
  for (int i = 0; i < 5; i++) {
×
1094
    // Check if we have enough data remaining
1095
    if (ptr - buffer + sizeof(int32_t) * 2 > fileSize) {
×
1096
      dError("unexpected end of file while reading %s metadata", keyNames[i]);
×
1097
      taosMemoryFree(buffer);
×
1098
      return TSDB_CODE_FILE_CORRUPTED;
×
1099
    }
1100

1101
    int32_t savedAlgo = 0;
×
1102
    int32_t encryptedLen = 0;
×
1103

1104
    memcpy(&savedAlgo, ptr, sizeof(int32_t));
×
1105
    ptr += sizeof(int32_t);
×
1106
    memcpy(&encryptedLen, ptr, sizeof(int32_t));
×
1107
    ptr += sizeof(int32_t);
×
1108

1109
    if (encryptedLen <= 0 || encryptedLen > 512) {
×
1110
      dError("invalid encrypted length %d for %s", encryptedLen, keyNames[i]);
×
1111
      taosMemoryFree(buffer);
×
1112
      return TSDB_CODE_FILE_CORRUPTED;
×
1113
    }
1114

1115
    if (ptr - buffer + encryptedLen > fileSize) {
×
1116
      dError("unexpected end of file while reading %s encrypted data", keyNames[i]);
×
1117
      taosMemoryFree(buffer);
×
1118
      return TSDB_CODE_FILE_CORRUPTED;
×
1119
    }
1120

1121
    uint8_t encrypted[512] = {0};
×
1122
    memcpy(encrypted, ptr, encryptedLen);
×
1123
    ptr += encryptedLen;
×
1124

1125
    // Decrypt with current key using CBC
1126
    char decrypted[512] = {0};
×
1127

1128
    SCryptOpts opts = {0};
×
1129
    opts.len = encryptedLen;
×
1130
    opts.source = (char *)encrypted;
×
1131
    opts.result = decrypted;
×
1132
    opts.unitLen = 16;
×
1133
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
×
1134
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
×
1135

1136
    int32_t count = CBC_Decrypt(&opts);
×
1137
    if (count != opts.len) {
×
1138
      code = terrno ? terrno : TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1139
      dError("failed to decrypt verification for %s, count=%d, expected=%d, since %s - KEY IS INCORRECT", keyNames[i],
×
1140
             count, opts.len, tstrerror(code));
1141
      taosMemoryFree(buffer);
×
1142
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1143
    }
1144

1145
    // Verify decrypted data matches original plaintext (compare only the plaintext length)
1146
    if (memcmp(decrypted, plaintext, plaintextLen) != 0) {
×
1147
      dError("%s verification FAILED: decrypted text does not match - KEY IS INCORRECT", keyNames[i]);
×
1148
      taosMemoryFree(buffer);
×
1149
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1150
    }
1151

1152
    dInfo("%s verification passed (algorithm=%d)", keyNames[i], savedAlgo);
×
1153
  }
1154

1155
  taosMemoryFree(buffer);
×
1156
  dInfo("all encryption keys verified successfully");
×
1157
  return 0;
×
1158
}
1159

1160
// Public API: Verify and initialize encryption keys at startup
1161
int32_t dmVerifyAndInitEncryptionKeys(void) {
673,078✔
1162
  // Skip verification in dump sdb mode (taosd -s)
1163
  if (tsSkipKeyCheckMode) {
673,078✔
1164
    dInfo("skip encryption key verification in some special check mode");
1,584✔
1165
    return 0;
1,584✔
1166
  }
1167

1168
  // Check if encryption keys are loaded
1169
  if (tsEncryptKeysStatus != TSDB_ENCRYPT_KEY_STAT_LOADED) {
671,494✔
1170
    dDebug("encryption keys not loaded, skipping verification");
669,390✔
1171
    return 0;
669,390✔
1172
  }
1173

1174
  // Get key file paths
1175
  char    masterKeyFile[PATH_MAX] = {0};
2,104✔
1176
  char    derivedKeyFile[PATH_MAX] = {0};
2,104✔
1177
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
2,104✔
1178
                            TD_DIRSEP, TD_DIRSEP);
1179
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
2,104✔
1180
    dError("failed to build master key file path");
×
1181
    return TSDB_CODE_OUT_OF_BUFFER;
×
1182
  }
1183

1184
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
2,104✔
1185
                    TD_DIRSEP, TD_DIRSEP);
1186
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
2,104✔
1187
    dError("failed to build derived key file path");
×
1188
    return TSDB_CODE_OUT_OF_BUFFER;
×
1189
  }
1190

1191
  // Load encryption keys
1192
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
2,104✔
1193
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
2,104✔
1194
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
2,104✔
1195
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
2,104✔
1196
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
2,104✔
1197
  int32_t algorithm = 0;
2,104✔
1198
  int32_t cfgAlgorithm = 0;
2,104✔
1199
  int32_t metaAlgorithm = 0;
2,104✔
1200
  int32_t fileVersion = 0;
2,104✔
1201
  int32_t keyVersion = 0;
2,104✔
1202
  int64_t createTime = 0;
2,104✔
1203
  int64_t svrKeyUpdateTime = 0;
2,104✔
1204
  int64_t dbKeyUpdateTime = 0;
2,104✔
1205

1206
  int32_t code = taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey,
2,104✔
1207
                                      &algorithm, &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime,
1208
                                      &svrKeyUpdateTime, &dbKeyUpdateTime);
1209
  if (code != 0) {
2,104✔
1210
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1211
    return code;
×
1212
  }
1213

1214
  // Verify all keys
1215
  code = dmVerifyEncryptionKeys(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
2,104✔
1216
  if (code != 0) {
2,104✔
1217
    dError("encryption key verification failed, since %s", tstrerror(code));
×
1218
    return code;
×
1219
  }
1220

1221
  dInfo("encryption keys verified and initialized successfully");
2,104✔
1222
  return 0;
2,104✔
1223
}
1224
#else
1225
int32_t dmVerifyAndInitEncryptionKeys(void) {
1226
  // Community edition or no TaosK support
1227
  return 0;
1228
}
1229
#endif
1230

1231
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1232
static int32_t dmUpdateSvrKey(const char *newKey) {
150✔
1233
  if (newKey == NULL || newKey[0] == '\0') {
150✔
1234
    dError("invalid new SVR_KEY, key is empty");
×
1235
    return TSDB_CODE_INVALID_PARA;
×
1236
  }
1237

1238
  char masterKeyFile[PATH_MAX] = {0};
150✔
1239
  char derivedKeyFile[PATH_MAX] = {0};
150✔
1240

1241
  // Build path to key files
1242
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
150✔
1243
                            TD_DIRSEP, TD_DIRSEP);
1244
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
150✔
1245
    dError("failed to build master key file path");
×
1246
    return TSDB_CODE_OUT_OF_BUFFER;
×
1247
  }
1248

1249
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
150✔
1250
                    TD_DIRSEP, TD_DIRSEP);
1251
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
150✔
1252
    dError("failed to build derived key file path");
×
1253
    return TSDB_CODE_OUT_OF_BUFFER;
×
1254
  }
1255

1256
  // Load current keys
1257
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1258
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1259
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1260
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1261
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1262
  int32_t algorithm = 0;
150✔
1263
  int32_t cfgAlgorithm = 0;
150✔
1264
  int32_t metaAlgorithm = 0;
150✔
1265
  int32_t fileVersion = 0;
150✔
1266
  int32_t keyVersion = 0;
150✔
1267
  int64_t createTime = 0;
150✔
1268
  int64_t svrKeyUpdateTime = 0;
150✔
1269
  int64_t dbKeyUpdateTime = 0;
150✔
1270

1271
  int32_t code =
1272
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
150✔
1273
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1274
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1275
  if (code != 0) {
150✔
1276
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1277
    return code;
×
1278
  }
1279

1280
  // Update SVR_KEY
1281
  int64_t now = taosGetTimestampMs();
150✔
1282
  int32_t newKeyVersion = keyVersion + 1;
150✔
1283

1284
  dInfo("updating SVR_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
150✔
1285
  tstrncpy(svrKey, newKey, sizeof(svrKey));
150✔
1286
  svrKeyUpdateTime = now;
150✔
1287

1288
  // Save updated keys (use algorithm for all keys for backward compatibility)
1289
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
150✔
1290
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1291
  if (code != 0) {
150✔
1292
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1293
    return code;
×
1294
  }
1295

1296
  // Update key verification file with new SVR_KEY
1297
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
150✔
1298
  if (code != 0) {
150✔
1299
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1300
    // Don't fail the operation if verification file update fails
1301
  }
1302

1303
  // Update global variables
1304
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
150✔
1305
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
150✔
1306
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
150✔
1307
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
150✔
1308
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
150✔
1309
  tsEncryptAlgorithmType = algorithm;
150✔
1310
  tsEncryptFileVersion = fileVersion;
150✔
1311
  tsEncryptKeyVersion = newKeyVersion;
150✔
1312
  tsEncryptKeyCreateTime = createTime;
150✔
1313
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
150✔
1314
  tsDbKeyUpdateTime = dbKeyUpdateTime;
150✔
1315

1316
  // Update encryption key status for backward compatibility
1317
  int keyLen = strlen(tsDataKey);
150✔
1318
  if (keyLen > ENCRYPT_KEY_LEN) {
150✔
1319
    keyLen = ENCRYPT_KEY_LEN;
×
1320
  }
1321
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
150✔
1322
  memcpy(tsEncryptKey, tsDataKey, keyLen);
150✔
1323
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
150✔
1324
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
150✔
1325

1326
  dInfo("successfully updated SVR_KEY to version:%d", newKeyVersion);
150✔
1327
  return 0;
150✔
1328
}
1329

1330
static int32_t dmUpdateKeyExpiration(int32_t days, const char *strategy) {
×
1331
  if (days < 0) {
×
1332
    dError("invalid days value:%d, must be >= 0", days);
×
1333
    return TSDB_CODE_INVALID_PARA;
×
1334
  }
1335

1336
  if (strategy == NULL || strategy[0] == '\0') {
×
1337
    dError("invalid strategy, strategy is empty");
×
1338
    return TSDB_CODE_INVALID_PARA;
×
1339
  }
1340

1341
  // Validate strategy value
1342
  if (strcmp(strategy, "ALARM") != 0) {
×
1343
    dWarn("unknown strategy:%s, supported values: ALARM. Will use it anyway.", strategy);
×
1344
  }
1345

1346
  // Update global variables directly
1347
  tsKeyExpirationDays = days;
×
1348
  tstrncpy(tsKeyExpirationStrategy, strategy, sizeof(tsKeyExpirationStrategy));
×
1349

1350
  dInfo("successfully updated key expiration config: days=%d, strategy=%s", days, strategy);
×
1351
  return 0;
×
1352
}
1353

1354
static int32_t dmUpdateDbKey(const char *newKey) {
150✔
1355
  if (newKey == NULL || newKey[0] == '\0') {
150✔
1356
    dError("invalid new DB_KEY, key is empty");
×
1357
    return TSDB_CODE_INVALID_PARA;
×
1358
  }
1359

1360
  char masterKeyFile[PATH_MAX] = {0};
150✔
1361
  char derivedKeyFile[PATH_MAX] = {0};
150✔
1362

1363
  // Build path to key files
1364
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
150✔
1365
                            TD_DIRSEP, TD_DIRSEP);
1366
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
150✔
1367
    dError("failed to build master key file path");
×
1368
    return TSDB_CODE_OUT_OF_BUFFER;
×
1369
  }
1370

1371
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
150✔
1372
                    TD_DIRSEP, TD_DIRSEP);
1373
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
150✔
1374
    dError("failed to build derived key file path");
×
1375
    return TSDB_CODE_OUT_OF_BUFFER;
×
1376
  }
1377

1378
  // Load current keys
1379
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1380
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1381
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1382
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1383
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
150✔
1384
  int32_t algorithm = 0;
150✔
1385
  int32_t cfgAlgorithm = 0;
150✔
1386
  int32_t metaAlgorithm = 0;
150✔
1387
  int32_t fileVersion = 0;
150✔
1388
  int32_t keyVersion = 0;
150✔
1389
  int64_t createTime = 0;
150✔
1390
  int64_t svrKeyUpdateTime = 0;
150✔
1391
  int64_t dbKeyUpdateTime = 0;
150✔
1392

1393
  int32_t code =
1394
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
150✔
1395
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1396
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1397
  if (code != 0) {
150✔
1398
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1399
    return code;
×
1400
  }
1401

1402
  // Update DB_KEY
1403
  int64_t now = taosGetTimestampMs();
150✔
1404
  int32_t newKeyVersion = keyVersion + 1;
150✔
1405

1406
  dInfo("updating DB_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
150✔
1407
  tstrncpy(dbKey, newKey, sizeof(dbKey));
150✔
1408
  dbKeyUpdateTime = now;
150✔
1409

1410
  // Save updated keys (use algorithm for all keys for backward compatibility)
1411
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
150✔
1412
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1413
  if (code != 0) {
150✔
1414
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1415
    return code;
×
1416
  }
1417

1418
  // Update key verification file with new DB_KEY
1419
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
150✔
1420
  if (code != 0) {
150✔
1421
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1422
    // Don't fail the operation if verification file update fails
1423
  }
1424

1425
  // Update global variables
1426
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
150✔
1427
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
150✔
1428
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
150✔
1429
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
150✔
1430
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
150✔
1431
  tsEncryptAlgorithmType = algorithm;
150✔
1432
  tsEncryptFileVersion = fileVersion;
150✔
1433
  tsEncryptKeyVersion = newKeyVersion;
150✔
1434
  tsEncryptKeyCreateTime = createTime;
150✔
1435
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
150✔
1436
  tsDbKeyUpdateTime = dbKeyUpdateTime;
150✔
1437

1438
  // Update encryption key status for backward compatibility
1439
  int keyLen = strlen(tsDataKey);
150✔
1440
  if (keyLen > ENCRYPT_KEY_LEN) {
150✔
1441
    keyLen = ENCRYPT_KEY_LEN;
×
1442
  }
1443
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
150✔
1444
  memcpy(tsEncryptKey, tsDataKey, keyLen);
150✔
1445
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
150✔
1446
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
150✔
1447

1448
  dInfo("successfully updated DB_KEY to version:%d", newKeyVersion);
150✔
1449
  return 0;
150✔
1450
}
1451
#endif
1452

1453
int32_t dmProcessAlterEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
300✔
1454
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1455
  int32_t              code = 0;
300✔
1456
  SMAlterEncryptKeyReq alterKeyReq = {0};
300✔
1457
  if (tDeserializeSMAlterEncryptKeyReq(pMsg->pCont, pMsg->contLen, &alterKeyReq) != 0) {
300✔
1458
    code = TSDB_CODE_INVALID_MSG;
×
1459
    dError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
1460
    goto _exit;
×
1461
  }
1462

1463
  dInfo("received alter encrypt key req, keyType:%d", alterKeyReq.keyType);
300✔
1464

1465
  // Update the specified key (svr_key or db_key)
1466
  if (alterKeyReq.keyType == 0) {
300✔
1467
    // Update SVR_KEY
1468
    code = dmUpdateSvrKey(alterKeyReq.newKey);
150✔
1469
    if (code == 0) {
150✔
1470
      dInfo("successfully updated SVR_KEY");
150✔
1471
    } else {
1472
      dError("failed to update SVR_KEY, since %s", tstrerror(code));
×
1473
    }
1474
  } else if (alterKeyReq.keyType == 1) {
150✔
1475
    // Update DB_KEY
1476
    code = dmUpdateDbKey(alterKeyReq.newKey);
150✔
1477
    if (code == 0) {
150✔
1478
      dInfo("successfully updated DB_KEY");
150✔
1479
    } else {
1480
      dError("failed to update DB_KEY, since %s", tstrerror(code));
×
1481
    }
1482
  } else {
1483
    dError("invalid keyType:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", alterKeyReq.keyType);
×
1484
    code = TSDB_CODE_INVALID_PARA;
×
1485
  }
1486

1487
_exit:
300✔
1488
  tFreeSMAlterEncryptKeyReq(&alterKeyReq);
300✔
1489
  pMsg->code = code;
300✔
1490
  pMsg->info.rsp = NULL;
300✔
1491
  pMsg->info.rspLen = 0;
300✔
1492
  return code;
300✔
1493
#else
1494
  dError("encryption key management is only available in enterprise edition");
1495
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1496
  pMsg->info.rsp = NULL;
1497
  pMsg->info.rspLen = 0;
1498
  return TSDB_CODE_OPS_NOT_SUPPORT;
1499
#endif
1500
}
1501

1502
int32_t dmProcessAlterKeyExpirationReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1503
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1504
  int32_t                 code = 0;
×
1505
  SMAlterKeyExpirationReq alterReq = {0};
×
1506
  if (tDeserializeSMAlterKeyExpirationReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
×
1507
    code = TSDB_CODE_INVALID_MSG;
×
1508
    dError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
1509
    goto _exit;
×
1510
  }
1511

1512
  dInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
1513

1514
  // Update key expiration configuration
1515
  code = dmUpdateKeyExpiration(alterReq.days, alterReq.strategy);
×
1516
  if (code == 0) {
×
1517
    dInfo("successfully updated key expiration: %d days, strategy: %s", alterReq.days, alterReq.strategy);
×
1518
  } else {
1519
    dError("failed to update key expiration, since %s", tstrerror(code));
×
1520
  }
1521

1522
_exit:
×
1523
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
1524
  pMsg->code = code;
×
1525
  pMsg->info.rsp = NULL;
×
1526
  pMsg->info.rspLen = 0;
×
1527
  return code;
×
1528
#else
1529
  dError("key expiration management is only available in enterprise edition");
1530
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1531
  pMsg->info.rsp = NULL;
1532
  pMsg->info.rspLen = 0;
1533
  return TSDB_CODE_OPS_NOT_SUPPORT;
1534
#endif
1535
}
1536

1537
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1538
  int32_t code = 0;
×
1539
  int32_t lino = 0;
×
1540
  SMsgCb *msgCb = &pMgmt->msgCb;
×
1541
  void *pTransCli = msgCb->clientRpc;
×
1542
  void *pTransStatus = msgCb->statusRpc;  
×
1543
  void *pTransSync = msgCb->syncRpc; 
×
1544
  void *pTransServer = msgCb->serverRpc;
×
1545

1546
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
1547
  if (code != 0) {
×
1548
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
1549
    goto _error;
×
1550
  }
1551

1552
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
1553
  if (code != 0) {
×
1554
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
1555
    goto _error;
×
1556
  }
1557

1558
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
1559
  if (code != 0) {
×
1560
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
1561
    goto _error;
×
1562
  }
1563

1564
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
1565
  if (code != 0) {
×
1566
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
1567
    goto _error;
×
1568
  }
1569

1570
_error:
×
1571
  
1572
  return code;
×
1573
}
1574

1575
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
184✔
1576
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
184✔
1577
  pStatus->details[0] = 0;
184✔
1578

1579
  SMonMloadInfo minfo = {0};
184✔
1580
  (*pMgmt->getMnodeLoadsFp)(&minfo);
184✔
1581
  if (minfo.isMnode &&
184✔
1582
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
184✔
1583
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1584
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
1585
    return;
×
1586
  }
1587

1588
  SMonVloadInfo vinfo = {0};
184✔
1589
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
184✔
1590
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
552✔
1591
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
368✔
1592
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
368✔
1593
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1594
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
1595
               syncStr(pLoad->syncState));
×
1596
      break;
×
1597
    }
1598
  }
1599

1600
  taosArrayDestroy(vinfo.pVloads);
184✔
1601
}
1602

1603
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
184✔
1604
  int32_t code = 0;
184✔
1605
  dDebug("server run status req is received");
184✔
1606
  SServerStatusRsp statusRsp = {0};
184✔
1607
  dmGetServerRunStatus(pMgmt, &statusRsp);
184✔
1608

1609
  pMsg->info.rsp = NULL;
184✔
1610
  pMsg->info.rspLen = 0;
184✔
1611

1612
  SRpcMsg rspMsg = {.info = pMsg->info};
184✔
1613
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
184✔
1614
  if (rspLen < 0) {
184✔
1615
    return TSDB_CODE_OUT_OF_MEMORY;
×
1616
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1617
    // return rspMsg.code;
1618
  }
1619

1620
  void *pRsp = rpcMallocCont(rspLen);
184✔
1621
  if (pRsp == NULL) {
184✔
1622
    return terrno;
×
1623
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1624
    // return rspMsg.code;
1625
  }
1626

1627
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
184✔
1628
  if (rspLen < 0) {
184✔
1629
    return TSDB_CODE_INVALID_MSG;
×
1630
  }
1631

1632
  pMsg->info.rsp = pRsp;
184✔
1633
  pMsg->info.rspLen = rspLen;
184✔
1634
  return 0;
184✔
1635
}
1636

1637
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
26,235✔
1638
  int32_t code = 0;
26,235✔
1639

1640
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
26,235✔
1641
  if (pBlock == NULL) {
26,235✔
1642
    return terrno;
×
1643
  }
1644

1645
  size_t size = 0;
26,235✔
1646

1647
  const SSysTableMeta *pMeta = NULL;
26,235✔
1648
  getInfosDbMeta(&pMeta, &size);
26,235✔
1649

1650
  int32_t index = 0;
26,235✔
1651
  for (int32_t i = 0; i < size; ++i) {
550,935✔
1652
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
550,935✔
1653
      index = i;
26,235✔
1654
      break;
26,235✔
1655
    }
1656
  }
1657

1658
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
26,235✔
1659
  if (pBlock->pDataBlock == NULL) {
26,235✔
1660
    code = terrno;
×
1661
    goto _exit;
×
1662
  }
1663

1664
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
183,645✔
1665
    SColumnInfoData colInfoData = {0};
157,410✔
1666
    colInfoData.info.colId = i + 1;
157,410✔
1667
    colInfoData.info.type = pMeta[index].schema[i].type;
157,410✔
1668
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
157,410✔
1669
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
314,820✔
1670
      code = terrno;
×
1671
      goto _exit;
×
1672
    }
1673
  }
1674

1675
  pBlock->info.hasVarCol = true;
26,235✔
1676
_exit:
26,235✔
1677
  if (code != 0) {
26,235✔
1678
    blockDataDestroy(pBlock);
×
1679
  } else {
1680
    *ppBlock = pBlock;
26,235✔
1681
  }
1682
  return code;
26,235✔
1683
}
1684

1685
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
26,235✔
1686
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL, SHOW_VAR_PRIV_ALL);
26,235✔
1687
  if (code != 0) {
26,235✔
1688
    return code;
×
1689
  }
1690

1691
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
26,235✔
1692
  if (pColInfo == NULL) {
26,235✔
1693
    return TSDB_CODE_OUT_OF_RANGE;
×
1694
  }
1695

1696
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
26,235✔
1697
}
1698

1699
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
26,235✔
1700
  int32_t           size = 0;
26,235✔
1701
  int32_t           rowsRead = 0;
26,235✔
1702
  int32_t           code = 0;
26,235✔
1703
  SRetrieveTableReq retrieveReq = {0};
26,235✔
1704
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
26,235✔
1705
    return TSDB_CODE_INVALID_MSG;
×
1706
  }
1707
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
26,235✔
1708
#if 0
1709
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
1710
    code = TSDB_CODE_MND_NO_RIGHTS;
1711
    return code;
1712
  }
1713
#endif
1714
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
26,235✔
1715
    return TSDB_CODE_INVALID_MSG;
×
1716
  }
1717

1718
  SSDataBlock *pBlock = NULL;
26,235✔
1719
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
26,235✔
1720
    return code;
×
1721
  }
1722

1723
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
26,235✔
1724
  if (code != 0) {
26,235✔
1725
    blockDataDestroy(pBlock);
×
1726
    return code;
×
1727
  }
1728

1729
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
26,235✔
1730
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
26,235✔
1731
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
26,235✔
1732

1733
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
26,235✔
1734
  if (pRsp == NULL) {
26,235✔
1735
    code = terrno;
×
1736
    dError("failed to retrieve data since %s", tstrerror(code));
×
1737
    blockDataDestroy(pBlock);
×
1738
    return code;
×
1739
  }
1740

1741
  char *pStart = pRsp->data;
26,235✔
1742
  *(int32_t *)pStart = htonl(numOfCols);
26,235✔
1743
  pStart += sizeof(int32_t);  // number of columns
26,235✔
1744

1745
  for (int32_t i = 0; i < numOfCols; ++i) {
183,645✔
1746
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
157,410✔
1747
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
157,410✔
1748

1749
    pSchema->bytes = htonl(pColInfo->info.bytes);
157,410✔
1750
    pSchema->colId = htons(pColInfo->info.colId);
157,410✔
1751
    pSchema->type = pColInfo->info.type;
157,410✔
1752

1753
    pStart += sizeof(SSysTableSchema);
157,410✔
1754
  }
1755

1756
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
26,235✔
1757
  if (len < 0) {
26,235✔
1758
    dError("failed to retrieve data since %s", tstrerror(code));
×
1759
    blockDataDestroy(pBlock);
×
1760
    rpcFreeCont(pRsp);
×
1761
    return terrno;
×
1762
  }
1763

1764
  pRsp->numOfRows = htonl(pBlock->info.rows);
26,235✔
1765
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
26,235✔
1766
  pRsp->completed = 1;
26,235✔
1767
  pMsg->info.rsp = pRsp;
26,235✔
1768
  pMsg->info.rspLen = size;
26,235✔
1769
  dDebug("dnode variables retrieve completed");
26,235✔
1770

1771
  blockDataDestroy(pBlock);
26,235✔
1772
  return TSDB_CODE_SUCCESS;
26,235✔
1773
}
1774

1775
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
19,887,975✔
1776
  SMStreamHbRspMsg rsp = {0};
19,887,975✔
1777
  int32_t          code = 0;
19,887,975✔
1778
  SDecoder         decoder;
19,879,322✔
1779
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
19,887,975✔
1780
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
19,887,975✔
1781
  int64_t          currTs = taosGetTimestampMs();
19,887,975✔
1782

1783
  if (pMsg->code) {
19,887,975✔
1784
    return streamHbHandleRspErr(pMsg->code, currTs);
243,380✔
1785
  }
1786

1787
  tDecoderInit(&decoder, (uint8_t*)msg, len);
19,644,595✔
1788
  code = tDecodeStreamHbRsp(&decoder, &rsp);
19,644,595✔
1789
  if (code < 0) {
19,644,595✔
1790
    code = TSDB_CODE_INVALID_MSG;
×
1791
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
1792
    tDecoderClear(&decoder);
×
1793
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
1794
    return streamHbHandleRspErr(code, currTs);
×
1795
  }
1796

1797
  tDecoderClear(&decoder);
19,644,595✔
1798

1799
  return streamHbProcessRspMsg(&rsp);
19,644,595✔
1800
}
1801

1802

1803
SArray *dmGetMsgHandles() {
673,257✔
1804
  int32_t code = -1;
673,257✔
1805
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
673,257✔
1806
  if (pArray == NULL) {
673,257✔
1807
    return NULL;
×
1808
  }
1809

1810
  // Requests handled by DNODE
1811
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1812
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1813
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1814
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1815
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1816
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1817
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1818
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1819
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1820
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1821
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1822
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1823
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1824
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1825
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1826
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_KEY_EXPIRATION, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1827
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1828
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1829

1830
  // Requests handled by MNODE
1831
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1832
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1833
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
673,257✔
1834

1835
  code = 0;
673,257✔
1836

1837
_OVER:
673,257✔
1838
  if (code != 0) {
673,257✔
1839
    taosArrayDestroy(pArray);
×
1840
    return NULL;
×
1841
  } else {
1842
    return pArray;
673,257✔
1843
  }
1844
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc