• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.27
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "crypt.h"
21
#include "monitor.h"
22
#include "stream.h"
23
#include "systable.h"
24
#include "tanalytics.h"
25
#include "tchecksum.h"
26
#include "tencrypt.h"
27
#include "tutil.h"
28

29
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
30
#include "taoskInt.h"
31
#endif
32

33
extern SConfig *tsCfg;
34
extern void setAuditDbNameToken(char *pDb, char *pToken);
35

36
#ifndef TD_ENTERPRISE
37
void setAuditDbNameToken(char *pDb, char *pToken) {}
38
#endif
39

40
extern void getAuditDbNameToken(char *pDb, char *pToken);
41

42
#ifndef TD_ENTERPRISE
43
void getAuditDbNameToken(char *pDb, char *pToken) {}
44
#endif
45

46
SMonVloadInfo tsVinfo = {0};
47
SMnodeLoad    tsMLoad = {0};
48
SDnodeData    tsDnodeData = {0};
49

50
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
1,767,808✔
51
  int32_t code = 0;
1,767,808✔
52
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
1,767,808✔
53
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
427,429✔
54
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
427,429✔
55
    pMgmt->pData->dnodeId = pCfg->dnodeId;
427,429✔
56
    pMgmt->pData->clusterId = pCfg->clusterId;
427,429✔
57
    monSetDnodeId(pCfg->dnodeId);
427,429✔
58
    auditSetDnodeId(pCfg->dnodeId);
427,429✔
59
    code = dmWriteEps(pMgmt->pData);
427,429✔
60
    if (code != 0) {
427,429✔
61
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
751✔
62
            tstrerror(code));
63
    }
64
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
427,429✔
65
  }
66
}
1,767,808✔
67

68
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
1,919,243✔
69
  int32_t code = 0;
1,919,243✔
70
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
1,919,243✔
71
  if (pMgmt->pData->ipWhiteVer == ver) {
1,919,243✔
72
    if (ver == 0) {
1,918,820✔
73
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
1,918,134✔
74
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
1,918,134✔
75
        dError("failed to disable ip white list on dnode");
×
76
      }
77
    }
78
    return;
1,918,820✔
79
  }
80
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
423✔
81

82
  SRetrieveWhiteListReq req = {.ver = oldVer};
423✔
83
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
423✔
84
  if (contLen < 0) {
423✔
85
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
86
    return;
×
87
  }
88
  void *pHead = rpcMallocCont(contLen);
423✔
89
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
423✔
90
  if (contLen < 0) {
423✔
91
    rpcFreeCont(pHead);
×
92
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
93
    return;
×
94
  }
95

96
  SRpcMsg rpcMsg = {.pCont = pHead,
423✔
97
                    .contLen = contLen,
98
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
99
                    .info.ahandle = 0,
100
                    .info.notFreeAhandle = 1,
101
                    .info.refId = 0,
102
                    .info.noResp = 0,
103
                    .info.handle = 0};
104
  SEpSet  epset = {0};
423✔
105

106
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
423✔
107

108
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
423✔
109
  if (code != 0) {
423✔
110
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
111
  }
112
}
113

114

115

116
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
1,919,243✔
117
  int32_t code = 0;
1,919,243✔
118
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
1,919,243✔
119
  if (pMgmt->pData->timeWhiteVer == ver) {
1,919,243✔
120
    if (ver == 0) {
1,918,820✔
121
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
1,918,134✔
122
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
1,918,134✔
123
        dError("failed to disable time white list on dnode");
×
124
      }
125
    }
126
    return;
1,918,820✔
127
  }
128
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
423✔
129

130
  SRetrieveWhiteListReq req = {.ver = oldVer};
423✔
131
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
423✔
132
  if (contLen < 0) {
423✔
133
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
134
    return;
×
135
  }
136
  void *pHead = rpcMallocCont(contLen);
423✔
137
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
423✔
138
  if (contLen < 0) {
423✔
139
    rpcFreeCont(pHead);
×
140
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
141
    return;
×
142
  }
143

144
  SRpcMsg rpcMsg = {.pCont = pHead,
423✔
145
                    .contLen = contLen,
146
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
147
                    .info.ahandle = 0,
148
                    .info.notFreeAhandle = 1,
149
                    .info.refId = 0,
150
                    .info.noResp = 0,
151
                    .info.handle = 0};
152
  SEpSet  epset = {0};
423✔
153

154
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
423✔
155

156
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
423✔
157
  if (code != 0) {
423✔
158
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
159
  }
160
}
161

162

163

164
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
1,919,243✔
165
  int32_t code = 0;
1,919,243✔
166
  int64_t oldVer = taosAnalyGetVersion();
1,919,243✔
167
  if (oldVer == newVer) return;
1,919,243✔
168
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
169

170
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
171
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
172
  if (contLen < 0) {
×
173
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
174
    return;
×
175
  }
176

177
  void *pHead = rpcMallocCont(contLen);
×
178
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
179
  if (contLen < 0) {
×
180
    rpcFreeCont(pHead);
×
181
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
182
    return;
×
183
  }
184

185
  SRpcMsg rpcMsg = {
×
186
      .pCont = pHead,
187
      .contLen = contLen,
188
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
189
      .info.ahandle = 0,
190
      .info.refId = 0,
191
      .info.noResp = 0,
192
      .info.handle = 0,
193
  };
194
  SEpSet epset = {0};
×
195

196
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
197

198
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
199
  if (code != 0) {
×
200
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
201
  }
202
}
203

204
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
41,223,199✔
205
  const STraceId *trace = &pRsp->info.traceId;
41,223,199✔
206
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
41,223,199✔
207

208
  if (pRsp->code != 0) {
41,223,199✔
209
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
375,434✔
210
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
211
             pMgmt->statusSeq);
212
      pMgmt->pData->dropped = 1;
×
213
      if (dmWriteEps(pMgmt->pData) != 0) {
×
214
        dError("failed to write dnode file");
×
215
      }
216
      dInfo("dnode will exit since it is in the dropped state");
×
217
      (void)raise(SIGINT);
×
218
    }
219
  } else {
220
    SStatusRsp statusRsp = {0};
40,847,765✔
221
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
42,767,008✔
222
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
1,919,243✔
223
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
1,919,243✔
224
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
1,767,808✔
225
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
226
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
1,767,808✔
227
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
1,767,808✔
228
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
1,767,808✔
229
      }
230
      setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken);
1,919,243✔
231
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
1,919,243✔
232
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
1,919,243✔
233
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
1,919,243✔
234
    }
235
    tFreeSStatusRsp(&statusRsp);
40,847,765✔
236
  }
237
  rpcFreeCont(pRsp->pCont);
41,223,199✔
238
}
41,223,199✔
239

240
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
41,292,017✔
241
  int32_t    code = 0;
41,292,017✔
242
  SStatusReq req = {0};
41,292,017✔
243
  req.timestamp = taosGetTimestampMs();
41,292,017✔
244
  pMgmt->statusSeq++;
41,292,017✔
245

246
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
41,292,017✔
247
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
41,292,017✔
248
    dError("failed to lock status info lock");
×
249
    return;
×
250
  }
251

252
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
41,292,017✔
253
  req.sver = tsVersion;
41,292,017✔
254
  req.dnodeVer = tsDnodeData.dnodeVer;
41,292,017✔
255
  req.dnodeId = tsDnodeData.dnodeId;
41,292,017✔
256
  req.clusterId = tsDnodeData.clusterId;
41,292,017✔
257
  if (req.clusterId == 0) req.dnodeId = 0;
41,292,017✔
258
  req.rebootTime = tsDnodeData.rebootTime;
41,292,017✔
259
  req.updateTime = tsDnodeData.updateTime;
41,292,017✔
260
  req.numOfCores = tsNumOfCores;
41,292,017✔
261
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
41,292,017✔
262
  req.numOfDiskCfg = tsDiskCfgNum;
41,292,017✔
263
  req.memTotal = tsTotalMemoryKB * 1024;
41,292,017✔
264
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
41,292,017✔
265
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
41,292,017✔
266
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
41,292,017✔
267

268
  req.clusterCfg.statusInterval = tsStatusInterval;
41,292,017✔
269
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
41,292,017✔
270
  req.clusterCfg.checkTime = 0;
41,292,017✔
271
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
41,292,017✔
272
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
41,292,017✔
273
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
41,292,017✔
274
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
41,292,017✔
275
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
41,292,017✔
276
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
41,292,017✔
277
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
41,292,017✔
278
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
41,292,017✔
279
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
41,292,017✔
280
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
41,292,017✔
281
  char timestr[32] = "1970-01-01 00:00:00.00";
41,292,017✔
282
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
41,292,017✔
283
      0) {
284
    dError("failed to parse time since %s", tstrerror(code));
×
285
  }
286
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
41,292,017✔
287
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
41,292,017✔
288
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
41,292,017✔
289

290
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
41,292,017✔
291

292
  req.pVloads = tsVinfo.pVloads;
41,292,017✔
293
  tsVinfo.pVloads = NULL;
41,292,017✔
294

295
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
41,292,017✔
296
  req.mload = tsMLoad;
41,292,017✔
297

298
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
41,292,017✔
299
    dError("failed to unlock status info lock");
×
300
    return;
×
301
  }
302

303
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
41,292,017✔
304
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
41,292,017✔
305

306
  req.statusSeq = pMgmt->statusSeq;
41,292,017✔
307
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
41,292,017✔
308
  req.analVer = taosAnalyGetVersion();
41,292,017✔
309
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
41,292,017✔
310

311
  if (tsAuditUseToken) {
41,292,017✔
312
    getAuditDbNameToken(req.auditDB, req.auditToken);
41,292,017✔
313
  }
314

315
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
41,292,017✔
316
  if (contLen < 0) {
41,292,017✔
317
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
318
    return;
×
319
  }
320

321
  void *pHead = rpcMallocCont(contLen);
41,292,017✔
322
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
41,292,017✔
323
  if (contLen < 0) {
41,292,017✔
324
    rpcFreeCont(pHead);
×
325
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
326
    return;
×
327
  }
328
  tFreeSStatusReq(&req);
41,292,017✔
329

330
  SRpcMsg rpcMsg = {.pCont = pHead,
41,292,017✔
331
                    .contLen = contLen,
332
                    .msgType = TDMT_MND_STATUS,
333
                    .info.ahandle = 0,
334
                    .info.notFreeAhandle = 1,
335
                    .info.refId = 0,
336
                    .info.noResp = 0,
337
                    .info.handle = 0};
338
  SRpcMsg rpcRsp = {0};
41,292,017✔
339

340
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
41,292,017✔
341

342
  SEpSet epSet = {0};
41,292,017✔
343
  int8_t epUpdated = 0;
41,292,017✔
344
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
41,292,017✔
345

346
  if (dDebugFlag & DEBUG_TRACE) {
41,292,017✔
347
    char tbuf[512];
1,496,372✔
348
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
1,496,372✔
349
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
1,496,372✔
350
  }
351
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
41,292,017✔
352
  if (code != 0) {
41,292,017✔
353
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
68,818✔
354
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
68,818✔
355
      dmRotateMnodeEpSet(pMgmt->pData);
68,818✔
356
      char tbuf[512];
68,818✔
357
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
68,818✔
358
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
68,818✔
359
            tbuf, epSet.inUse);
360
    }
361
    return;
68,818✔
362
  }
363

364
  if (rpcRsp.code != 0) {
41,223,199✔
365
    dmRotateMnodeEpSet(pMgmt->pData);
375,434✔
366
    char tbuf[512];
375,434✔
367
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
375,434✔
368
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
375,434✔
369
          tbuf, epSet.inUse);
370
  } else {
371
    if (epUpdated == 1) {
40,847,765✔
372
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
82,758✔
373
    }
374
  }
375
  dmProcessStatusRsp(pMgmt, &rpcRsp);
41,223,199✔
376
}
377

378
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
554,522✔
379
  const STraceId *trace = &pRsp->info.traceId;
554,522✔
380
  int32_t         code = 0;
554,522✔
381
  SConfigRsp      configRsp = {0};
554,522✔
382
  bool            needStop = false;
554,522✔
383

384
  if (pRsp->code != 0) {
554,522✔
385
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
386
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
387
      pMgmt->pData->dropped = 1;
×
388
      if (dmWriteEps(pMgmt->pData) != 0) {
×
389
        dError("failed to write dnode file");
×
390
      }
391
      dInfo("dnode will exit since it is in the dropped state");
×
392
      (void)raise(SIGINT);
×
393
    }
394
  } else {
395
    bool needUpdate = false;
554,522✔
396
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,109,044✔
397
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
554,522✔
398
      // Try to use cfg from mnode sdb.
399
      if (!configRsp.isVersionVerified) {
554,522✔
400
        uInfo("config version not verified, update config");
428,244✔
401
        needUpdate = true;
428,244✔
402
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
428,244✔
403
        if (code != TSDB_CODE_SUCCESS) {
428,244✔
404
          dError("failed to persist global config since %s", tstrerror(code));
×
405
          goto _exit;
×
406
        }
407
      }
408
    }
409
    if (needUpdate) {
554,522✔
410
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
428,244✔
411
      if (code != TSDB_CODE_SUCCESS) {
428,244✔
412
        dError("failed to update config since %s", tstrerror(code));
×
413
        goto _exit;
×
414
      }
415
      code = setAllConfigs(tsCfg);
428,244✔
416
      if (code != TSDB_CODE_SUCCESS) {
428,244✔
417
        dError("failed to set all configs since %s", tstrerror(code));
×
418
        goto _exit;
×
419
      }
420
    }
421
    code = taosPersistLocalConfig(pMgmt->path);
554,522✔
422
    if (code != TSDB_CODE_SUCCESS) {
554,522✔
423
      dError("failed to persist local config since %s", tstrerror(code));
×
424
    }
425
    tsConfigInited = 1;
554,522✔
426
  }
427
_exit:
554,522✔
428
  tFreeSConfigRsp(&configRsp);
554,522✔
429
  rpcFreeCont(pRsp->pCont);
554,522✔
430
  if (needStop) {
554,522✔
431
    dmStop();
×
432
  }
433
}
554,522✔
434

435
int32_t dmProcessKeySyncRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
531,523✔
436
  const STraceId *trace = &pRsp->info.traceId;
531,523✔
437
  int32_t         code = 0;
531,523✔
438
  SKeySyncRsp     keySyncRsp = {0};
531,523✔
439

440
  if (pRsp->code != 0) {
531,523✔
441
    dError("failed to sync keys from mnode since %s", tstrerror(pRsp->code));
×
442
    code = pRsp->code;
×
443
    goto _exit;
×
444
  }
445

446
  if (pRsp->pCont == NULL || pRsp->contLen <= 0) {
531,523✔
447
    dError("invalid key sync response, empty content");
×
448
    code = TSDB_CODE_INVALID_MSG;
×
449
    goto _exit;
×
450
  }
451

452
  code = tDeserializeSKeySyncRsp(pRsp->pCont, pRsp->contLen, &keySyncRsp);
531,523✔
453
  if (code != 0) {
531,523✔
454
    dError("failed to deserialize key sync response since %s", tstrerror(code));
×
455
    goto _exit;
×
456
  }
457

458
  dInfo("received key sync response, mnode keyVersion:%d, local keyVersion:%d, needUpdate:%d", keySyncRsp.keyVersion,
531,523✔
459
        tsLocalKeyVersion, keySyncRsp.needUpdate);
460
  tsEncryptKeysStatus = keySyncRsp.encryptionKeyStatus;
531,523✔
461
  if (keySyncRsp.needUpdate) {
531,523✔
462
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
463
    // Get encrypt file path from tsDataDir
464
    char masterKeyFile[PATH_MAX] = {0};
1,482✔
465
    char derivedKeyFile[PATH_MAX] = {0};
1,482✔
466
    snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,482✔
467
             TD_DIRSEP);
468
    snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,482✔
469
             TD_DIRSEP);
470

471
    dInfo("updating local encryption keys from mnode, key file is saved in %s and %s, keyVersion:%d -> %d",
1,482✔
472
          masterKeyFile, derivedKeyFile, tsLocalKeyVersion, keySyncRsp.keyVersion);
473

474
    // Save keys to master.bin and derived.bin
475
    // Use the same algorithm for cfg and meta keys (backward compatible)
476
    code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, keySyncRsp.svrKey, keySyncRsp.dbKey, keySyncRsp.cfgKey, keySyncRsp.metaKey,
1,482✔
477
                                keySyncRsp.dataKey, keySyncRsp.algorithm, keySyncRsp.algorithm, keySyncRsp.algorithm,
478
                                keySyncRsp.keyVersion, keySyncRsp.createTime,
479
                                keySyncRsp.svrKeyUpdateTime, keySyncRsp.dbKeyUpdateTime);
480
    if (code != 0) {
1,482✔
481
      dError("failed to save encryption keys since %s", tstrerror(code));
×
482
      goto _exit;
×
483
    }
484

485
    // Update global variables with synced keys
486
    tstrncpy(tsSvrKey, keySyncRsp.svrKey, sizeof(tsSvrKey));
1,482✔
487
    tstrncpy(tsDbKey, keySyncRsp.dbKey, sizeof(tsDbKey));
1,482✔
488
    tstrncpy(tsCfgKey, keySyncRsp.cfgKey, sizeof(tsCfgKey));
1,482✔
489
    tstrncpy(tsMetaKey, keySyncRsp.metaKey, sizeof(tsMetaKey));
1,482✔
490
    tstrncpy(tsDataKey, keySyncRsp.dataKey, sizeof(tsDataKey));
1,482✔
491
    tsEncryptAlgorithmType = keySyncRsp.algorithm;
1,482✔
492
    tsEncryptKeyVersion = keySyncRsp.keyVersion;
1,482✔
493
    tsEncryptKeyCreateTime = keySyncRsp.createTime;
1,482✔
494
    tsSvrKeyUpdateTime = keySyncRsp.svrKeyUpdateTime;
1,482✔
495
    tsDbKeyUpdateTime = keySyncRsp.dbKeyUpdateTime;
1,482✔
496

497
    // Update local key version
498
    tsLocalKeyVersion = keySyncRsp.keyVersion;
1,482✔
499
    dInfo("successfully updated local encryption keys to version:%d", tsLocalKeyVersion);
1,482✔
500

501
    // Encrypt existing plaintext config files
502
    code = taosEncryptExistingCfgFiles(tsDataDir);
1,482✔
503
    if (code != 0) {
1,482✔
UNCOV
504
      dWarn("failed to encrypt existing config files since %s, will retry on next write", tstrerror(code));
×
505
      // Don't fail the key sync, files will be encrypted on next write
UNCOV
506
      code = 0;
×
507
    }
508
#else
509
    dWarn("enterprise features not enabled, skipping key sync");
510
#endif
511
  } else {
512
    dDebug("local keys are up to date, version:%d", tsLocalKeyVersion);
530,041✔
513
  }
514
  
515
  code = TSDB_CODE_SUCCESS;
531,523✔
516

517
_exit:
531,523✔
518
  rpcFreeCont(pRsp->pCont);
531,523✔
519
  return code;
531,523✔
520
}
521

522
void dmSendKeySyncReq(SDnodeMgmt *pMgmt) {
537,447✔
523
  int32_t     code = 0;
537,447✔
524
  SKeySyncReq req = {0};
537,447✔
525

526
  req.dnodeId = pMgmt->pData->dnodeId;
537,447✔
527
  req.keyVersion = tsLocalKeyVersion;
537,447✔
528
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d", req.dnodeId, req.keyVersion);
537,447✔
529

530
  int32_t contLen = tSerializeSKeySyncReq(NULL, 0, &req);
537,447✔
531
  if (contLen < 0) {
537,447✔
532
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
533
    return;
32✔
534
  }
535

536
  void *pHead = rpcMallocCont(contLen);
537,447✔
537
  if (pHead == NULL) {
537,447✔
538
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
539
    return;
×
540
  }
541
  contLen = tSerializeSKeySyncReq(pHead, contLen, &req);
537,447✔
542
  if (contLen < 0) {
537,447✔
543
    rpcFreeCont(pHead);
×
544
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
545
    return;
×
546
  }
547

548
  SRpcMsg rpcMsg = {.pCont = pHead,
537,447✔
549
                    .contLen = contLen,
550
                    .msgType = TDMT_MND_KEY_SYNC,
551
                    .info.ahandle = 0,
552
                    .info.notFreeAhandle = 1,
553
                    .info.refId = 0,
554
                    .info.noResp = 0,
555
                    .info.handle = 0};
556
  SRpcMsg rpcRsp = {0};
537,447✔
557

558
  SEpSet epSet = {0};
537,447✔
559
  int8_t epUpdated = 0;
537,447✔
560
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
537,447✔
561

562
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d, begin to send rpc msg", req.dnodeId, req.keyVersion);
537,447✔
563
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
537,447✔
564
  if (code != 0) {
537,447✔
565
    dError("failed to SendRecv key sync req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
5,924✔
566
    return;
5,924✔
567
  }
568
  if (rpcRsp.code != 0) {
531,523✔
569
    dError("failed to send key sync req since %s", tstrerror(rpcRsp.code));
×
570
    return;
×
571
  }
572
  code = dmProcessKeySyncRsp(pMgmt, &rpcRsp);
531,523✔
573
  if (code != 0) {
531,523✔
574
    dError("failed to process key sync rsp since %s", tstrerror(code));
×
575
    return;
×
576
  }
577
}
578

579
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
557,942✔
580
  int32_t    code = 0;
557,942✔
581
  SConfigReq req = {0};
557,942✔
582

583
  req.cver = tsdmConfigVersion;
557,942✔
584
  req.forceReadConfig = true;
557,942✔
585
  req.array = taosGetGlobalCfg(tsCfg);
557,942✔
586
  dDebug("send config req to mnode, configVersion:%d", req.cver);
557,942✔
587

588
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
557,942✔
589
  if (contLen < 0) {
557,942✔
590
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
591
    return;
×
592
  }
593

594
  void *pHead = rpcMallocCont(contLen);
557,942✔
595
  if (pHead == NULL) {
557,942✔
596
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
597
    return;
×
598
  }
599
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
557,942✔
600
  if (contLen < 0) {
557,942✔
601
    rpcFreeCont(pHead);
×
602
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
603
    return;
×
604
  }
605

606
  SRpcMsg rpcMsg = {.pCont = pHead,
557,942✔
607
                    .contLen = contLen,
608
                    .msgType = TDMT_MND_CONFIG,
609
                    .info.ahandle = 0,
610
                    .info.notFreeAhandle = 1,
611
                    .info.refId = 0,
612
                    .info.noResp = 0,
613
                    .info.handle = 0};
614
  SRpcMsg rpcRsp = {0};
557,942✔
615

616
  SEpSet epSet = {0};
557,942✔
617
  int8_t epUpdated = 0;
557,942✔
618
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
557,942✔
619

620
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
557,942✔
621
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
557,942✔
622
  if (code != 0) {
557,942✔
623
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
3,420✔
624
    return;
3,420✔
625
  }
626
  if (rpcRsp.code != 0) {
554,522✔
627
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
628
    return;
×
629
  }
630
  dmProcessConfigRsp(pMgmt, &rpcRsp);
554,522✔
631
}
632

633
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
41,800,445✔
634
  dDebug("begin to get dnode info");
41,800,445✔
635
  SDnodeData dnodeData = {0};
41,800,445✔
636
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
41,800,445✔
637
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
41,800,445✔
638
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
41,800,445✔
639
  dnodeData.clusterId = pMgmt->pData->clusterId;
41,800,445✔
640
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
41,800,445✔
641
  dnodeData.updateTime = pMgmt->pData->updateTime;
41,800,445✔
642
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
41,800,445✔
643
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
41,800,445✔
644

645
  dDebug("begin to get vnode loads");
41,800,445✔
646
  SMonVloadInfo vinfo = {0};
41,800,445✔
647
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
41,800,445✔
648

649
  dDebug("begin to get mnode loads");
41,800,445✔
650
  SMonMloadInfo minfo = {0};
41,800,445✔
651
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
41,800,445✔
652

653
  dDebug("begin to lock status info");
41,800,445✔
654
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
41,800,445✔
655
    dError("failed to lock status info lock");
×
656
    return;
×
657
  }
658
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
41,800,445✔
659
  tsDnodeData.dnodeId = dnodeData.dnodeId;
41,800,445✔
660
  tsDnodeData.clusterId = dnodeData.clusterId;
41,800,445✔
661
  tsDnodeData.rebootTime = dnodeData.rebootTime;
41,800,445✔
662
  tsDnodeData.updateTime = dnodeData.updateTime;
41,800,445✔
663
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
41,800,445✔
664

665
  if (tsVinfo.pVloads == NULL) {
41,800,445✔
666
    tsVinfo.pVloads = vinfo.pVloads;
40,710,671✔
667
    vinfo.pVloads = NULL;
40,710,671✔
668
  } else {
669
    taosArrayDestroy(vinfo.pVloads);
1,089,774✔
670
    vinfo.pVloads = NULL;
1,089,774✔
671
  }
672

673
  tsMLoad = minfo.load;
41,800,445✔
674

675
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
41,800,445✔
676
    dError("failed to unlock status info lock");
×
677
    return;
×
678
  }
679
}
680

681
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
682
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
683
  if (contLen < 0) {
×
684
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
685
    return;
×
686
  }
687
  void *pHead = rpcMallocCont(contLen);
×
688
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
689
  if (contLen < 0) {
×
690
    rpcFreeCont(pHead);
×
691
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
692
    return;
×
693
  }
694

695
  SRpcMsg rpcMsg = {.pCont = pHead,
×
696
                    .contLen = contLen,
697
                    .msgType = TDMT_MND_NOTIFY,
698
                    .info.ahandle = 0,
699
                    .info.notFreeAhandle = 1,
700
                    .info.refId = 0,
701
                    .info.noResp = 1,
702
                    .info.handle = 0};
703

704
  SEpSet epSet = {0};
×
705
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
706
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
707
    dError("failed to send notify req");
×
708
  }
709
}
710

711
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
712
  dError("auth rsp is received, but not supported yet");
×
713
  return 0;
×
714
}
715

716
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
717
  dError("grant rsp is received, but not supported yet");
×
718
  return 0;
×
719
}
720

721
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
73,133✔
722
  int32_t       code = 0;
73,133✔
723
  SDCfgDnodeReq cfgReq = {0};
73,133✔
724
  SConfig      *pCfg = taosGetCfg();
73,133✔
725
  SConfigItem  *pItem = NULL;
73,133✔
726

727
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
73,133✔
728
    return TSDB_CODE_INVALID_MSG;
×
729
  }
730
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
73,133✔
731
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
3,228✔
732
  }
733

734
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
69,905✔
735

736
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
69,905✔
737
  if (code != 0) {
69,905✔
738
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
188✔
739
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
188✔
740
      return TSDB_CODE_SUCCESS;
188✔
741
    } else {
742
      return code;
×
743
    }
744
  }
745
  if (pItem == NULL) {
69,717✔
746
    return TSDB_CODE_CFG_NOT_FOUND;
×
747
  }
748

749
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
69,717✔
750
    char value[10] = {0};
×
751
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
752
      tsSyncTimeout = 0;
×
753
    }
754

755
    if (tsSyncTimeout > 0) {
×
756
      SConfigItem *pItemTmp = NULL;
×
757
      char         tmp[10] = {0};
×
758

759
      sprintf(tmp, "%d", tsSyncTimeout);
×
760
      TAOS_CHECK_RETURN(
×
761
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
762
      if (pItemTmp == NULL) {
×
763
        return TSDB_CODE_CFG_NOT_FOUND;
×
764
      }
765

766
      sprintf(tmp, "%d", tsSyncTimeout / 4);
×
767
      TAOS_CHECK_RETURN(
×
768
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
769
      if (pItemTmp == NULL) {
×
770
        return TSDB_CODE_CFG_NOT_FOUND;
×
771
      }
772
      TAOS_CHECK_RETURN(
×
773
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
774
      if (pItemTmp == NULL) {
×
775
        return TSDB_CODE_CFG_NOT_FOUND;
×
776
      }
777

778
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
779
      TAOS_CHECK_RETURN(
×
780
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
781
      if (pItemTmp == NULL) {
×
782
        return TSDB_CODE_CFG_NOT_FOUND;
×
783
      }
784
      TAOS_CHECK_RETURN(
×
785
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
786
      if (pItemTmp == NULL) {
×
787
        return TSDB_CODE_CFG_NOT_FOUND;
×
788
      }
789
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
790
      if (pItemTmp == NULL) {
×
791
        return TSDB_CODE_CFG_NOT_FOUND;
×
792
      }
793

794
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
795
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
796
      if (pItemTmp == NULL) {
×
797
        return TSDB_CODE_CFG_NOT_FOUND;
×
798
      }
799

800
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
801
      TAOS_CHECK_RETURN(
×
802
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
803
      if (pItemTmp == NULL) {
×
804
        return TSDB_CODE_CFG_NOT_FOUND;
×
805
      }
806
      TAOS_CHECK_RETURN(
×
807
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
808
      if (pItemTmp == NULL) {
×
809
        return TSDB_CODE_CFG_NOT_FOUND;
×
810
      }
811
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
812
      if (pItemTmp == NULL) {
×
813
        return TSDB_CODE_CFG_NOT_FOUND;
×
814
      }
815

816
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
817
            tsSyncTimeout);
818
    }
819
  }
820

821
  if (!isConifgItemLazyMode(pItem)) {
69,717✔
822
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
69,127✔
823

824
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
69,127✔
825
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
826
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
827
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
828

829
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
830
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
831
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
832
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
833

834
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
835
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
836
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
837

838
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
839
            tsSyncTimeout);
840
    }
841
  }
842

843
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
69,717✔
844
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
9,633✔
845
    if (code != TSDB_CODE_SUCCESS) {
9,633✔
846
      dError("failed to persist global config since %s", tstrerror(code));
×
847
    }
848
  } else {
849
    code = taosPersistLocalConfig(pMgmt->path);
60,084✔
850
    if (code != TSDB_CODE_SUCCESS) {
60,084✔
851
      dError("failed to persist local config since %s", tstrerror(code));
×
852
    }
853
  }
854

855
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
69,717✔
856
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
857

858
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
859
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
860
  }
861

862
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
69,717✔
863
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
69,717✔
864
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
865
  }
866

867
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
69,717✔
868
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
69,717✔
869
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
870
  }
871

872
  if (cfgReq.version > 0) {
69,717✔
873
    tsdmConfigVersion = cfgReq.version;
15,529✔
874
  }
875
  return code;
69,717✔
876
}
877

878
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
164✔
879
#ifdef TD_ENTERPRISE
880
  int32_t       code = 0;
164✔
881
  SDCfgDnodeReq cfgReq = {0};
164✔
882
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
164✔
883
    code = TSDB_CODE_INVALID_MSG;
×
884
    goto _exit;
×
885
  }
886

887
  code = dmUpdateEncryptKey(cfgReq.value, true);
164✔
888
  if (code == 0) {
164✔
889
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
164✔
890
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
164✔
891
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
164✔
892
  }
893

894
_exit:
164✔
895
  pMsg->code = code;
164✔
896
  pMsg->info.rsp = NULL;
164✔
897
  pMsg->info.rspLen = 0;
164✔
898
  return code;
164✔
899
#else
900
  return 0;
901
#endif
902
}
903

904
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
905
// Verification plaintext used to validate encryption keys
906
#define KEY_VERIFY_PLAINTEXT "TDengine_Encryption_Key_Verification_v1.0"
907

908
// Save key verification file with encrypted plaintext for each key
909
static int32_t dmSaveKeyVerification(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
802✔
910
                                     const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
911
                                     int32_t metaAlgorithm) {
912
  char    verifyFile[PATH_MAX] = {0};
802✔
913
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
802✔
914
                            TD_DIRSEP, TD_DIRSEP);
915
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
802✔
916
    dError("failed to build key verification file path");
×
917
    return TSDB_CODE_OUT_OF_BUFFER;
×
918
  }
919

920
  int32_t     code = 0;
802✔
921
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
802✔
922
  int32_t     plaintextLen = strlen(plaintext);
802✔
923

924
  // Array of keys and their algorithms
925
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
802✔
926
  int32_t     algorithms[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
802✔
927
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
802✔
928

929
  // Calculate total buffer size
930
  int32_t encryptedLen = ((plaintextLen + 15) / 16) * 16;                 // Padded length for CBC
802✔
931
  int32_t headerSize = sizeof(uint32_t) + sizeof(uint16_t);               // magic + version
802✔
932
  int32_t perKeySize = sizeof(int32_t) + sizeof(int32_t) + encryptedLen;  // algo + len + encrypted data
802✔
933
  int32_t totalSize = headerSize + perKeySize * 5;
802✔
934

935
  // Allocate buffer for all data
936
  char *buffer = taosMemoryMalloc(totalSize);
802✔
937
  if (buffer == NULL) {
802✔
938
    dError("failed to allocate memory for key verification buffer");
×
939
    return terrno;
×
940
  }
941

942
  char *ptr = buffer;
802✔
943

944
  // Write magic number and version to buffer
945
  uint32_t magic = 0x544B5659;  // "TKVY" in hex
802✔
946
  uint16_t version = 1;
802✔
947
  memcpy(ptr, &magic, sizeof(magic));
802✔
948
  ptr += sizeof(magic);
802✔
949
  memcpy(ptr, &version, sizeof(version));
802✔
950
  ptr += sizeof(version);
802✔
951

952
  // Encrypt all keys and write to buffer
953
  char paddedPlaintext[512] = {0};
802✔
954
  memcpy(paddedPlaintext, plaintext, plaintextLen);
802✔
955

956
  for (int i = 0; i < 5; i++) {
4,812✔
957
    char encrypted[512] = {0};
4,010✔
958

959
    // Encrypt the verification plaintext with this key using CBC
960
    SCryptOpts opts = {0};
4,010✔
961
    opts.len = encryptedLen;
4,010✔
962
    opts.source = paddedPlaintext;
4,010✔
963
    opts.result = encrypted;
4,010✔
964
    opts.unitLen = 16;
4,010✔
965
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
4,010✔
966
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
4,010✔
967

968
    int32_t count = CBC_Encrypt(&opts);
4,010✔
969
    if (count != opts.len) {
4,010✔
970
      code = terrno ? terrno : TSDB_CODE_FAILED;
×
UNCOV
971
      dError("failed to encrypt verification for %s, count=%d, expected=%d, since %s", keyNames[i], count, opts.len,
×
972
             tstrerror(code));
973
      taosMemoryFree(buffer);
×
UNCOV
974
      return code;
×
975
    }
976

977
    // Write to buffer: algorithm + encrypted length + encrypted data
978
    memcpy(ptr, &algorithms[i], sizeof(int32_t));
4,010✔
979
    ptr += sizeof(int32_t);
4,010✔
980
    memcpy(ptr, &encryptedLen, sizeof(int32_t));
4,010✔
981
    ptr += sizeof(int32_t);
4,010✔
982
    memcpy(ptr, encrypted, encryptedLen);
4,010✔
983
    ptr += encryptedLen;
4,010✔
984

985
    dDebug("prepared verification for %s: algorithm=%d, encLen=%d", keyNames[i], algorithms[i], encryptedLen);
4,010✔
986
  }
987

988
  // Write all data to file in one operation
989
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
802✔
990
  if (pFile == NULL) {
802✔
991
    dError("failed to create key verification file:%s, errno:%d", verifyFile, errno);
×
992
    taosMemoryFree(buffer);
×
UNCOV
993
    return TSDB_CODE_FILE_CORRUPTED;
×
994
  }
995

996
  int64_t written = taosWriteFile(pFile, buffer, totalSize);
802✔
997
  (void)taosCloseFile(&pFile);
802✔
998
  taosMemoryFree(buffer);
802✔
999

1000
  if (written != totalSize) {
802✔
1001
    dError("failed to write key verification file, written=%" PRId64 ", expected=%d", written, totalSize);
×
UNCOV
1002
    return TSDB_CODE_FILE_CORRUPTED;
×
1003
  }
1004

1005
  dInfo("successfully saved key verification file:%s, size=%d", verifyFile, totalSize);
802✔
1006
  return 0;
802✔
1007
}
1008

1009
// Verify all encryption keys by decrypting and comparing with original plaintext
1010
static int32_t dmVerifyEncryptionKeys(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
802✔
1011
                                      const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
1012
                                      int32_t metaAlgorithm) {
1013
  char    verifyFile[PATH_MAX] = {0};
802✔
1014
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
802✔
1015
                            TD_DIRSEP, TD_DIRSEP);
1016
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
802✔
1017
    dError("failed to build key verification file path");
×
UNCOV
1018
    return TSDB_CODE_OUT_OF_BUFFER;
×
1019
  }
1020

1021
  // Get file size
1022
  int64_t fileSize = 0;
802✔
1023
  if (taosStatFile(verifyFile, &fileSize, NULL, NULL) < 0) {
802✔
1024
    // File doesn't exist, create it with current keys
1025
    dInfo("key verification file not found, creating new one");
802✔
1026
    return dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
802✔
1027
  }
1028

1029
  if (fileSize <= 0 || fileSize > 10240) {  // Max 10KB
×
1030
    dError("invalid key verification file size: %" PRId64, fileSize);
×
UNCOV
1031
    return TSDB_CODE_FILE_CORRUPTED;
×
1032
  }
1033

1034
  // Allocate buffer and read entire file
1035
  char *buffer = taosMemoryMalloc(fileSize);
×
1036
  if (buffer == NULL) {
×
1037
    dError("failed to allocate memory for key verification buffer");
×
UNCOV
1038
    return terrno;
×
1039
  }
1040

1041
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_READ);
×
1042
  if (pFile == NULL) {
×
1043
    dError("failed to open key verification file:%s", verifyFile);
×
1044
    taosMemoryFree(buffer);
×
UNCOV
1045
    return TSDB_CODE_FILE_CORRUPTED;
×
1046
  }
1047

1048
  int64_t bytesRead = taosReadFile(pFile, buffer, fileSize);
×
UNCOV
1049
  (void)taosCloseFile(&pFile);
×
1050

1051
  if (bytesRead != fileSize) {
×
1052
    dError("failed to read key verification file, read=%" PRId64 ", expected=%" PRId64, bytesRead, fileSize);
×
1053
    taosMemoryFree(buffer);
×
UNCOV
1054
    return TSDB_CODE_FILE_CORRUPTED;
×
1055
  }
1056

1057
  int32_t     code = 0;
×
1058
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
×
1059
  int32_t     plaintextLen = strlen(plaintext);
×
UNCOV
1060
  const char *ptr = buffer;
×
1061

1062
  // Parse and verify header
1063
  uint32_t magic = 0;
×
1064
  uint16_t version = 0;
×
1065
  memcpy(&magic, ptr, sizeof(magic));
×
1066
  ptr += sizeof(magic);
×
1067
  memcpy(&version, ptr, sizeof(version));
×
UNCOV
1068
  ptr += sizeof(version);
×
1069

1070
  if (magic != 0x544B5659) {
×
1071
    dError("invalid magic number in key verification file: 0x%x", magic);
×
1072
    taosMemoryFree(buffer);
×
UNCOV
1073
    return TSDB_CODE_FILE_CORRUPTED;
×
1074
  }
1075

1076
  // Array of keys and their algorithms
1077
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
×
1078
  int32_t     expectedAlgos[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
×
UNCOV
1079
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
×
1080

1081
  // Verify each key from buffer
UNCOV
1082
  for (int i = 0; i < 5; i++) {
×
1083
    // Check if we have enough data remaining
1084
    if (ptr - buffer + sizeof(int32_t) * 2 > fileSize) {
×
1085
      dError("unexpected end of file while reading %s metadata", keyNames[i]);
×
1086
      taosMemoryFree(buffer);
×
UNCOV
1087
      return TSDB_CODE_FILE_CORRUPTED;
×
1088
    }
1089

1090
    int32_t savedAlgo = 0;
×
UNCOV
1091
    int32_t encryptedLen = 0;
×
1092

1093
    memcpy(&savedAlgo, ptr, sizeof(int32_t));
×
1094
    ptr += sizeof(int32_t);
×
1095
    memcpy(&encryptedLen, ptr, sizeof(int32_t));
×
UNCOV
1096
    ptr += sizeof(int32_t);
×
1097

1098
    if (encryptedLen <= 0 || encryptedLen > 512) {
×
1099
      dError("invalid encrypted length %d for %s", encryptedLen, keyNames[i]);
×
1100
      taosMemoryFree(buffer);
×
UNCOV
1101
      return TSDB_CODE_FILE_CORRUPTED;
×
1102
    }
1103

1104
    if (ptr - buffer + encryptedLen > fileSize) {
×
1105
      dError("unexpected end of file while reading %s encrypted data", keyNames[i]);
×
1106
      taosMemoryFree(buffer);
×
UNCOV
1107
      return TSDB_CODE_FILE_CORRUPTED;
×
1108
    }
1109

1110
    uint8_t encrypted[512] = {0};
×
1111
    memcpy(encrypted, ptr, encryptedLen);
×
UNCOV
1112
    ptr += encryptedLen;
×
1113

1114
    // Decrypt with current key using CBC
UNCOV
1115
    char decrypted[512] = {0};
×
1116

1117
    SCryptOpts opts = {0};
×
1118
    opts.len = encryptedLen;
×
1119
    opts.source = (char *)encrypted;
×
1120
    opts.result = decrypted;
×
1121
    opts.unitLen = 16;
×
NEW
1122
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
×
UNCOV
1123
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
×
1124

1125
    int32_t count = CBC_Decrypt(&opts);
×
1126
    if (count != opts.len) {
×
1127
      code = terrno ? terrno : TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
UNCOV
1128
      dError("failed to decrypt verification for %s, count=%d, expected=%d, since %s - KEY IS INCORRECT", keyNames[i],
×
1129
             count, opts.len, tstrerror(code));
1130
      taosMemoryFree(buffer);
×
UNCOV
1131
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1132
    }
1133

1134
    // Verify decrypted data matches original plaintext (compare only the plaintext length)
1135
    if (memcmp(decrypted, plaintext, plaintextLen) != 0) {
×
1136
      dError("%s verification FAILED: decrypted text does not match - KEY IS INCORRECT", keyNames[i]);
×
1137
      taosMemoryFree(buffer);
×
UNCOV
1138
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1139
    }
1140

UNCOV
1141
    dInfo("%s verification passed (algorithm=%d)", keyNames[i], savedAlgo);
×
1142
  }
1143

1144
  taosMemoryFree(buffer);
×
1145
  dInfo("all encryption keys verified successfully");
×
UNCOV
1146
  return 0;
×
1147
}
1148

1149
// Public API: Verify and initialize encryption keys at startup
1150
int32_t dmVerifyAndInitEncryptionKeys(void) {
556,065✔
1151
  // Skip verification in dump sdb mode (taosd -s)
1152
  if (tsSkipKeyCheckMode) {
556,065✔
1153
    dInfo("skip encryption key verification in some special check mode");
×
UNCOV
1154
    return 0;
×
1155
  }
1156

1157
  // Check if encryption keys are loaded
1158
  if (tsEncryptKeysStatus != TSDB_ENCRYPT_KEY_STAT_LOADED) {
556,065✔
1159
    dDebug("encryption keys not loaded, skipping verification");
555,263✔
1160
    return 0;
555,263✔
1161
  }
1162

1163
  // Get key file paths
1164
  char    masterKeyFile[PATH_MAX] = {0};
802✔
1165
  char    derivedKeyFile[PATH_MAX] = {0};
802✔
1166
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
802✔
1167
                            TD_DIRSEP, TD_DIRSEP);
1168
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
802✔
1169
    dError("failed to build master key file path");
×
UNCOV
1170
    return TSDB_CODE_OUT_OF_BUFFER;
×
1171
  }
1172

1173
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
802✔
1174
                    TD_DIRSEP, TD_DIRSEP);
1175
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
802✔
1176
    dError("failed to build derived key file path");
×
UNCOV
1177
    return TSDB_CODE_OUT_OF_BUFFER;
×
1178
  }
1179

1180
  // Load encryption keys
1181
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
802✔
1182
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
802✔
1183
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
802✔
1184
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
802✔
1185
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
802✔
1186
  int32_t algorithm = 0;
802✔
1187
  int32_t cfgAlgorithm = 0;
802✔
1188
  int32_t metaAlgorithm = 0;
802✔
1189
  int32_t fileVersion = 0;
802✔
1190
  int32_t keyVersion = 0;
802✔
1191
  int64_t createTime = 0;
802✔
1192
  int64_t svrKeyUpdateTime = 0;
802✔
1193
  int64_t dbKeyUpdateTime = 0;
802✔
1194

1195
  int32_t code = taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey,
802✔
1196
                                      &algorithm, &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime,
1197
                                      &svrKeyUpdateTime, &dbKeyUpdateTime);
1198
  if (code != 0) {
802✔
1199
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
UNCOV
1200
    return code;
×
1201
  }
1202

1203
  // Verify all keys
1204
  code = dmVerifyEncryptionKeys(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
802✔
1205
  if (code != 0) {
802✔
1206
    dError("encryption key verification failed, since %s", tstrerror(code));
×
UNCOV
1207
    return code;
×
1208
  }
1209

1210
  dInfo("encryption keys verified and initialized successfully");
802✔
1211
  return 0;
802✔
1212
}
1213
#else
1214
int32_t dmVerifyAndInitEncryptionKeys(void) {
1215
  // Community edition or no TaosK support
1216
  return 0;
1217
}
1218
#endif
1219

1220
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1221
static int32_t dmUpdateSvrKey(const char *newKey) {
×
1222
  if (newKey == NULL || newKey[0] == '\0') {
×
1223
    dError("invalid new SVR_KEY, key is empty");
×
UNCOV
1224
    return TSDB_CODE_INVALID_PARA;
×
1225
  }
1226

1227
  char masterKeyFile[PATH_MAX] = {0};
×
UNCOV
1228
  char derivedKeyFile[PATH_MAX] = {0};
×
1229

1230
  // Build path to key files
UNCOV
1231
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
1232
                            TD_DIRSEP, TD_DIRSEP);
1233
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
×
1234
    dError("failed to build master key file path");
×
UNCOV
1235
    return TSDB_CODE_OUT_OF_BUFFER;
×
1236
  }
1237

UNCOV
1238
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
1239
                    TD_DIRSEP, TD_DIRSEP);
1240
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
×
1241
    dError("failed to build derived key file path");
×
UNCOV
1242
    return TSDB_CODE_OUT_OF_BUFFER;
×
1243
  }
1244

1245
  // Load current keys
1246
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
×
1247
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
×
1248
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
×
1249
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
×
1250
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
×
1251
  int32_t algorithm = 0;
×
1252
  int32_t cfgAlgorithm = 0;
×
1253
  int32_t metaAlgorithm = 0;
×
1254
  int32_t fileVersion = 0;
×
1255
  int32_t keyVersion = 0;
×
1256
  int64_t createTime = 0;
×
1257
  int64_t svrKeyUpdateTime = 0;
×
UNCOV
1258
  int64_t dbKeyUpdateTime = 0;
×
1259

1260
  int32_t code =
UNCOV
1261
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
×
1262
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1263
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1264
  if (code != 0) {
×
1265
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
UNCOV
1266
    return code;
×
1267
  }
1268

1269
  // Update SVR_KEY
1270
  int64_t now = taosGetTimestampMs();
×
UNCOV
1271
  int32_t newKeyVersion = keyVersion + 1;
×
1272

1273
  dInfo("updating SVR_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
×
1274
  tstrncpy(svrKey, newKey, sizeof(svrKey));
×
UNCOV
1275
  svrKeyUpdateTime = now;
×
1276

1277
  // Save updated keys (use algorithm for all keys for backward compatibility)
UNCOV
1278
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
×
1279
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1280
  if (code != 0) {
×
1281
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
UNCOV
1282
    return code;
×
1283
  }
1284

1285
  // Update key verification file with new SVR_KEY
1286
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
×
1287
  if (code != 0) {
×
UNCOV
1288
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1289
    // Don't fail the operation if verification file update fails
1290
  }
1291

1292
  // Update global variables
1293
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
×
1294
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
×
1295
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
×
1296
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
×
1297
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
×
1298
  tsEncryptAlgorithmType = algorithm;
×
1299
  tsEncryptFileVersion = fileVersion;
×
1300
  tsEncryptKeyVersion = newKeyVersion;
×
1301
  tsEncryptKeyCreateTime = createTime;
×
1302
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
×
UNCOV
1303
  tsDbKeyUpdateTime = dbKeyUpdateTime;
×
1304

1305
  // Update encryption key status for backward compatibility
1306
  int keyLen = strlen(tsDataKey);
×
1307
  if (keyLen > ENCRYPT_KEY_LEN) {
×
UNCOV
1308
    keyLen = ENCRYPT_KEY_LEN;
×
1309
  }
1310
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
×
1311
  memcpy(tsEncryptKey, tsDataKey, keyLen);
×
1312
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
×
UNCOV
1313
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
×
1314

1315
  dInfo("successfully updated SVR_KEY to version:%d", newKeyVersion);
×
UNCOV
1316
  return 0;
×
1317
}
1318

NEW
1319
static int32_t dmUpdateKeyExpiration(int32_t days, const char *strategy) {
×
NEW
1320
  if (days < 0) {
×
NEW
1321
    dError("invalid days value:%d, must be >= 0", days);
×
NEW
1322
    return TSDB_CODE_INVALID_PARA;
×
1323
  }
1324

NEW
1325
  if (strategy == NULL || strategy[0] == '\0') {
×
NEW
1326
    dError("invalid strategy, strategy is empty");
×
NEW
1327
    return TSDB_CODE_INVALID_PARA;
×
1328
  }
1329

1330
  // Validate strategy value
NEW
1331
  if (strcmp(strategy, "ALARM") != 0) {
×
NEW
1332
    dWarn("unknown strategy:%s, supported values: ALARM. Will use it anyway.", strategy);
×
1333
  }
1334

1335
  // Update global variables directly
NEW
1336
  tsKeyExpirationDays = days;
×
NEW
1337
  tstrncpy(tsKeyExpirationStrategy, strategy, sizeof(tsKeyExpirationStrategy));
×
1338

NEW
1339
  dInfo("successfully updated key expiration config: days=%d, strategy=%s", days, strategy);
×
NEW
1340
  return 0;
×
1341
}
1342

1343
static int32_t dmUpdateDbKey(const char *newKey) {
×
1344
  if (newKey == NULL || newKey[0] == '\0') {
×
1345
    dError("invalid new DB_KEY, key is empty");
×
UNCOV
1346
    return TSDB_CODE_INVALID_PARA;
×
1347
  }
1348

1349
  char masterKeyFile[PATH_MAX] = {0};
×
1350
  char derivedKeyFile[PATH_MAX] = {0};
×
1351

1352
  // Build path to key files
UNCOV
1353
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
1354
                            TD_DIRSEP, TD_DIRSEP);
1355
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
×
UNCOV
1356
    dError("failed to build master key file path");
×
UNCOV
1357
    return TSDB_CODE_OUT_OF_BUFFER;
×
1358
  }
1359

1360
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
1361
                    TD_DIRSEP, TD_DIRSEP);
1362
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
×
1363
    dError("failed to build derived key file path");
×
UNCOV
1364
    return TSDB_CODE_OUT_OF_BUFFER;
×
1365
  }
1366

1367
  // Load current keys
1368
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
×
1369
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
×
UNCOV
1370
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
×
UNCOV
1371
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
×
1372
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
×
1373
  int32_t algorithm = 0;
×
UNCOV
1374
  int32_t cfgAlgorithm = 0;
×
UNCOV
1375
  int32_t metaAlgorithm = 0;
×
1376
  int32_t fileVersion = 0;
×
UNCOV
1377
  int32_t keyVersion = 0;
×
1378
  int64_t createTime = 0;
×
1379
  int64_t svrKeyUpdateTime = 0;
×
1380
  int64_t dbKeyUpdateTime = 0;
×
1381

1382
  int32_t code =
1383
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
×
1384
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1385
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1386
  if (code != 0) {
×
1387
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
UNCOV
1388
    return code;
×
1389
  }
1390

1391
  // Update DB_KEY
1392
  int64_t now = taosGetTimestampMs();
×
1393
  int32_t newKeyVersion = keyVersion + 1;
×
1394

1395
  dInfo("updating DB_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
×
1396
  tstrncpy(dbKey, newKey, sizeof(dbKey));
×
1397
  dbKeyUpdateTime = now;
×
1398

1399
  // Save updated keys (use algorithm for all keys for backward compatibility)
1400
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
×
1401
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1402
  if (code != 0) {
×
1403
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
UNCOV
1404
    return code;
×
1405
  }
1406

1407
  // Update key verification file with new DB_KEY
UNCOV
1408
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
×
1409
  if (code != 0) {
×
1410
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1411
    // Don't fail the operation if verification file update fails
1412
  }
1413

1414
  // Update global variables
1415
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
×
1416
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
×
UNCOV
1417
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
×
1418
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
×
1419
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
×
1420
  tsEncryptAlgorithmType = algorithm;
×
UNCOV
1421
  tsEncryptFileVersion = fileVersion;
×
UNCOV
1422
  tsEncryptKeyVersion = newKeyVersion;
×
1423
  tsEncryptKeyCreateTime = createTime;
×
UNCOV
1424
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
×
1425
  tsDbKeyUpdateTime = dbKeyUpdateTime;
×
1426

1427
  // Update encryption key status for backward compatibility
UNCOV
1428
  int keyLen = strlen(tsDataKey);
×
UNCOV
1429
  if (keyLen > ENCRYPT_KEY_LEN) {
×
UNCOV
1430
    keyLen = ENCRYPT_KEY_LEN;
×
1431
  }
1432
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
×
1433
  memcpy(tsEncryptKey, tsDataKey, keyLen);
×
UNCOV
1434
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
×
UNCOV
1435
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
×
1436

UNCOV
1437
  dInfo("successfully updated DB_KEY to version:%d", newKeyVersion);
×
1438
  return 0;
×
1439
}
1440
#endif
1441

1442
int32_t dmProcessAlterEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1443
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1444
  int32_t              code = 0;
×
1445
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
1446
  if (tDeserializeSMAlterEncryptKeyReq(pMsg->pCont, pMsg->contLen, &alterKeyReq) != 0) {
×
1447
    code = TSDB_CODE_INVALID_MSG;
×
1448
    dError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
UNCOV
1449
    goto _exit;
×
1450
  }
1451

1452
  dInfo("received alter encrypt key req, keyType:%d", alterKeyReq.keyType);
×
1453

1454
  // Update the specified key (svr_key or db_key)
1455
  if (alterKeyReq.keyType == 0) {
×
1456
    // Update SVR_KEY
1457
    code = dmUpdateSvrKey(alterKeyReq.newKey);
×
1458
    if (code == 0) {
×
UNCOV
1459
      dInfo("successfully updated SVR_KEY");
×
1460
    } else {
1461
      dError("failed to update SVR_KEY, since %s", tstrerror(code));
×
1462
    }
UNCOV
1463
  } else if (alterKeyReq.keyType == 1) {
×
1464
    // Update DB_KEY
1465
    code = dmUpdateDbKey(alterKeyReq.newKey);
×
UNCOV
1466
    if (code == 0) {
×
1467
      dInfo("successfully updated DB_KEY");
×
1468
    } else {
1469
      dError("failed to update DB_KEY, since %s", tstrerror(code));
×
1470
    }
1471
  } else {
1472
    dError("invalid keyType:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", alterKeyReq.keyType);
×
UNCOV
1473
    code = TSDB_CODE_INVALID_PARA;
×
1474
  }
1475

UNCOV
1476
_exit:
×
UNCOV
1477
  tFreeSMAlterEncryptKeyReq(&alterKeyReq);
×
1478
  pMsg->code = code;
×
UNCOV
1479
  pMsg->info.rsp = NULL;
×
1480
  pMsg->info.rspLen = 0;
×
1481
  return code;
×
1482
#else
1483
  dError("encryption key management is only available in enterprise edition");
1484
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1485
  pMsg->info.rsp = NULL;
1486
  pMsg->info.rspLen = 0;
1487
  return TSDB_CODE_OPS_NOT_SUPPORT;
1488
#endif
1489
}
1490

NEW
1491
int32_t dmProcessAlterKeyExpirationReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1492
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
NEW
1493
  int32_t                 code = 0;
×
NEW
1494
  SMAlterKeyExpirationReq alterReq = {0};
×
NEW
1495
  if (tDeserializeSMAlterKeyExpirationReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
×
NEW
1496
    code = TSDB_CODE_INVALID_MSG;
×
NEW
1497
    dError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
NEW
1498
    goto _exit;
×
1499
  }
1500

NEW
1501
  dInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
1502

1503
  // Update key expiration configuration
NEW
1504
  code = dmUpdateKeyExpiration(alterReq.days, alterReq.strategy);
×
NEW
1505
  if (code == 0) {
×
NEW
1506
    dInfo("successfully updated key expiration: %d days, strategy: %s", alterReq.days, alterReq.strategy);
×
1507
  } else {
NEW
1508
    dError("failed to update key expiration, since %s", tstrerror(code));
×
1509
  }
1510

NEW
1511
_exit:
×
NEW
1512
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
NEW
1513
  pMsg->code = code;
×
NEW
1514
  pMsg->info.rsp = NULL;
×
NEW
1515
  pMsg->info.rspLen = 0;
×
NEW
1516
  return code;
×
1517
#else
1518
  dError("key expiration management is only available in enterprise edition");
1519
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1520
  pMsg->info.rsp = NULL;
1521
  pMsg->info.rspLen = 0;
1522
  return TSDB_CODE_OPS_NOT_SUPPORT;
1523
#endif
1524
}
1525

1526
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1527
  int32_t code = 0;
×
UNCOV
1528
  int32_t lino = 0;
×
UNCOV
1529
  SMsgCb *msgCb = &pMgmt->msgCb;
×
1530
  void *pTransCli = msgCb->clientRpc;
×
1531
  void *pTransStatus = msgCb->statusRpc;  
×
UNCOV
1532
  void *pTransSync = msgCb->syncRpc; 
×
UNCOV
1533
  void *pTransServer = msgCb->serverRpc;
×
1534

1535
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
1536
  if (code != 0) {
×
1537
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
1538
    goto _error;
×
1539
  }
1540

UNCOV
1541
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
UNCOV
1542
  if (code != 0) {
×
UNCOV
1543
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
UNCOV
1544
    goto _error;
×
1545
  }
1546

UNCOV
1547
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
UNCOV
1548
  if (code != 0) {
×
1549
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
UNCOV
1550
    goto _error;
×
1551
  }
1552

1553
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
1554
  if (code != 0) {
×
1555
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
1556
    goto _error;
×
1557
  }
1558

1559
_error:
×
1560
  
UNCOV
1561
  return code;
×
1562
}
1563

1564
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
166✔
1565
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
166✔
1566
  pStatus->details[0] = 0;
166✔
1567

1568
  SMonMloadInfo minfo = {0};
166✔
1569
  (*pMgmt->getMnodeLoadsFp)(&minfo);
166✔
1570
  if (minfo.isMnode &&
166✔
1571
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
166✔
1572
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1573
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
1574
    return;
×
1575
  }
1576

1577
  SMonVloadInfo vinfo = {0};
166✔
1578
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
166✔
1579
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
498✔
1580
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
332✔
1581
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
332✔
UNCOV
1582
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
UNCOV
1583
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
1584
               syncStr(pLoad->syncState));
×
1585
      break;
×
1586
    }
1587
  }
1588

1589
  taosArrayDestroy(vinfo.pVloads);
166✔
1590
}
1591

1592
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
166✔
1593
  int32_t code = 0;
166✔
1594
  dDebug("server run status req is received");
166✔
1595
  SServerStatusRsp statusRsp = {0};
166✔
1596
  dmGetServerRunStatus(pMgmt, &statusRsp);
166✔
1597

1598
  pMsg->info.rsp = NULL;
166✔
1599
  pMsg->info.rspLen = 0;
166✔
1600

1601
  SRpcMsg rspMsg = {.info = pMsg->info};
166✔
1602
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
166✔
1603
  if (rspLen < 0) {
166✔
UNCOV
1604
    return TSDB_CODE_OUT_OF_MEMORY;
×
1605
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1606
    // return rspMsg.code;
1607
  }
1608

1609
  void *pRsp = rpcMallocCont(rspLen);
166✔
1610
  if (pRsp == NULL) {
166✔
1611
    return terrno;
×
1612
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1613
    // return rspMsg.code;
1614
  }
1615

1616
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
166✔
1617
  if (rspLen < 0) {
166✔
UNCOV
1618
    return TSDB_CODE_INVALID_MSG;
×
1619
  }
1620

1621
  pMsg->info.rsp = pRsp;
166✔
1622
  pMsg->info.rspLen = rspLen;
166✔
1623
  return 0;
166✔
1624
}
1625

1626
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
23,636✔
1627
  int32_t code = 0;
23,636✔
1628

1629
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
23,636✔
1630
  if (pBlock == NULL) {
23,636✔
1631
    return terrno;
×
1632
  }
1633

1634
  size_t size = 0;
23,636✔
1635

1636
  const SSysTableMeta *pMeta = NULL;
23,636✔
1637
  getInfosDbMeta(&pMeta, &size);
23,636✔
1638

1639
  int32_t index = 0;
23,636✔
1640
  for (int32_t i = 0; i < size; ++i) {
472,720✔
1641
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
472,720✔
1642
      index = i;
23,636✔
1643
      break;
23,636✔
1644
    }
1645
  }
1646

1647
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
23,636✔
1648
  if (pBlock->pDataBlock == NULL) {
23,636✔
UNCOV
1649
    code = terrno;
×
UNCOV
1650
    goto _exit;
×
1651
  }
1652

1653
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
165,452✔
1654
    SColumnInfoData colInfoData = {0};
141,816✔
1655
    colInfoData.info.colId = i + 1;
141,816✔
1656
    colInfoData.info.type = pMeta[index].schema[i].type;
141,816✔
1657
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
141,816✔
1658
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
283,632✔
UNCOV
1659
      code = terrno;
×
UNCOV
1660
      goto _exit;
×
1661
    }
1662
  }
1663

1664
  pBlock->info.hasVarCol = true;
23,636✔
1665
_exit:
23,636✔
1666
  if (code != 0) {
23,636✔
UNCOV
1667
    blockDataDestroy(pBlock);
×
1668
  } else {
1669
    *ppBlock = pBlock;
23,636✔
1670
  }
1671
  return code;
23,636✔
1672
}
1673

1674
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
23,636✔
1675
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
23,636✔
1676
  if (code != 0) {
23,636✔
UNCOV
1677
    return code;
×
1678
  }
1679

1680
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
23,636✔
1681
  if (pColInfo == NULL) {
23,636✔
UNCOV
1682
    return TSDB_CODE_OUT_OF_RANGE;
×
1683
  }
1684

1685
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
23,636✔
1686
}
1687

1688
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
23,636✔
1689
  int32_t           size = 0;
23,636✔
1690
  int32_t           rowsRead = 0;
23,636✔
1691
  int32_t           code = 0;
23,636✔
1692
  SRetrieveTableReq retrieveReq = {0};
23,636✔
1693
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
23,636✔
UNCOV
1694
    return TSDB_CODE_INVALID_MSG;
×
1695
  }
1696
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
23,636✔
1697
#if 0
1698
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
1699
    code = TSDB_CODE_MND_NO_RIGHTS;
1700
    return code;
1701
  }
1702
#endif
1703
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
23,636✔
UNCOV
1704
    return TSDB_CODE_INVALID_MSG;
×
1705
  }
1706

1707
  SSDataBlock *pBlock = NULL;
23,636✔
1708
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
23,636✔
UNCOV
1709
    return code;
×
1710
  }
1711

1712
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
23,636✔
1713
  if (code != 0) {
23,636✔
UNCOV
1714
    blockDataDestroy(pBlock);
×
UNCOV
1715
    return code;
×
1716
  }
1717

1718
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
23,636✔
1719
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
23,636✔
1720
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
23,636✔
1721

1722
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
23,636✔
1723
  if (pRsp == NULL) {
23,636✔
UNCOV
1724
    code = terrno;
×
1725
    dError("failed to retrieve data since %s", tstrerror(code));
×
UNCOV
1726
    blockDataDestroy(pBlock);
×
UNCOV
1727
    return code;
×
1728
  }
1729

1730
  char *pStart = pRsp->data;
23,636✔
1731
  *(int32_t *)pStart = htonl(numOfCols);
23,636✔
1732
  pStart += sizeof(int32_t);  // number of columns
23,636✔
1733

1734
  for (int32_t i = 0; i < numOfCols; ++i) {
165,452✔
1735
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
141,816✔
1736
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
141,816✔
1737

1738
    pSchema->bytes = htonl(pColInfo->info.bytes);
141,816✔
1739
    pSchema->colId = htons(pColInfo->info.colId);
141,816✔
1740
    pSchema->type = pColInfo->info.type;
141,816✔
1741

1742
    pStart += sizeof(SSysTableSchema);
141,816✔
1743
  }
1744

1745
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
23,636✔
1746
  if (len < 0) {
23,636✔
UNCOV
1747
    dError("failed to retrieve data since %s", tstrerror(code));
×
UNCOV
1748
    blockDataDestroy(pBlock);
×
UNCOV
1749
    rpcFreeCont(pRsp);
×
UNCOV
1750
    return terrno;
×
1751
  }
1752

1753
  pRsp->numOfRows = htonl(pBlock->info.rows);
23,636✔
1754
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
23,636✔
1755
  pRsp->completed = 1;
23,636✔
1756
  pMsg->info.rsp = pRsp;
23,636✔
1757
  pMsg->info.rspLen = size;
23,636✔
1758
  dDebug("dnode variables retrieve completed");
23,636✔
1759

1760
  blockDataDestroy(pBlock);
23,636✔
1761
  return TSDB_CODE_SUCCESS;
23,636✔
1762
}
1763

1764
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,273,498✔
1765
  SMStreamHbRspMsg rsp = {0};
14,273,498✔
1766
  int32_t          code = 0;
14,273,498✔
1767
  SDecoder         decoder;
14,271,084✔
1768
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
14,273,498✔
1769
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
14,273,498✔
1770
  int64_t          currTs = taosGetTimestampMs();
14,273,498✔
1771

1772
  if (pMsg->code) {
14,273,498✔
1773
    return streamHbHandleRspErr(pMsg->code, currTs);
164,577✔
1774
  }
1775

1776
  tDecoderInit(&decoder, (uint8_t*)msg, len);
14,108,921✔
1777
  code = tDecodeStreamHbRsp(&decoder, &rsp);
14,108,921✔
1778
  if (code < 0) {
14,108,921✔
UNCOV
1779
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1780
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
UNCOV
1781
    tDecoderClear(&decoder);
×
1782
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
1783
    return streamHbHandleRspErr(code, currTs);
×
1784
  }
1785

1786
  tDecoderClear(&decoder);
14,108,921✔
1787

1788
  return streamHbProcessRspMsg(&rsp);
14,108,921✔
1789
}
1790

1791

1792
SArray *dmGetMsgHandles() {
556,091✔
1793
  int32_t code = -1;
556,091✔
1794
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
556,091✔
1795
  if (pArray == NULL) {
556,091✔
UNCOV
1796
    return NULL;
×
1797
  }
1798

1799
  // Requests handled by DNODE
1800
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1801
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1802
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1803
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1804
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1805
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1806
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1807
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1808
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1809
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1810
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1811
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1812
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1813
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1814
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1815
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_KEY_EXPIRATION, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1816
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1817
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1818

1819
  // Requests handled by MNODE
1820
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1821
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1822
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,091✔
1823

1824
  code = 0;
556,091✔
1825

1826
_OVER:
556,091✔
1827
  if (code != 0) {
556,091✔
UNCOV
1828
    taosArrayDestroy(pArray);
×
UNCOV
1829
    return NULL;
×
1830
  } else {
1831
    return pArray;
556,091✔
1832
  }
1833
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc