• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4911

04 Jan 2026 09:05AM UTC coverage: 65.028% (-0.8%) from 65.864%
#4911

push

travis-ci

web-flow
merge: from main to 3.0 branch #34156

1206 of 4524 new or added lines in 22 files covered. (26.66%)

1517 existing lines in 134 files now uncovered.

195276 of 300296 relevant lines covered (65.03%)

116931714.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.02
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "crypt.h"
21
#include "monitor.h"
22
#include "stream.h"
23
#include "systable.h"
24
#include "tanalytics.h"
25
#include "tchecksum.h"
26
#include "tencrypt.h"
27
#include "tutil.h"
28

29
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
30
#include "taoskInt.h"
31
#endif
32

33
extern SConfig *tsCfg;
34
extern void setAuditDbNameToken(char *pDb, char *pToken);
35

36
#ifndef TD_ENTERPRISE
37
void setAuditDbNameToken(char *pDb, char *pToken) {}
38
#endif
39

40
extern void getAuditDbNameToken(char *pDb, char *pToken);
41

42
#ifndef TD_ENTERPRISE
43
void getAuditDbNameToken(char *pDb, char *pToken) {}
44
#endif
45

46
SMonVloadInfo tsVinfo = {0};
47
SMnodeLoad    tsMLoad = {0};
48
SDnodeData    tsDnodeData = {0};
49

50
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
1,690,900✔
51
  int32_t code = 0;
1,690,900✔
52
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
1,690,900✔
53
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
409,573✔
54
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
409,573✔
55
    pMgmt->pData->dnodeId = pCfg->dnodeId;
409,573✔
56
    pMgmt->pData->clusterId = pCfg->clusterId;
409,573✔
57
    monSetDnodeId(pCfg->dnodeId);
409,573✔
58
    auditSetDnodeId(pCfg->dnodeId);
409,573✔
59
    code = dmWriteEps(pMgmt->pData);
409,573✔
60
    if (code != 0) {
409,573✔
61
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
861✔
62
            tstrerror(code));
63
    }
64
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
409,573✔
65
  }
66
}
1,690,900✔
67

68
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
1,827,976✔
69
  int32_t code = 0;
1,827,976✔
70
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
1,827,976✔
71
  if (pMgmt->pData->ipWhiteVer == ver) {
1,827,976✔
72
    if (ver == 0) {
1,827,590✔
73
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
1,826,912✔
74
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
1,826,912✔
75
        dError("failed to disable ip white list on dnode");
×
76
      }
77
    }
78
    return;
1,827,590✔
79
  }
80
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
386✔
81

82
  SRetrieveWhiteListReq req = {.ver = oldVer};
386✔
83
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
386✔
84
  if (contLen < 0) {
386✔
85
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
86
    return;
×
87
  }
88
  void *pHead = rpcMallocCont(contLen);
386✔
89
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
386✔
90
  if (contLen < 0) {
386✔
91
    rpcFreeCont(pHead);
×
92
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
93
    return;
×
94
  }
95

96
  SRpcMsg rpcMsg = {.pCont = pHead,
386✔
97
                    .contLen = contLen,
98
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
99
                    .info.ahandle = 0,
100
                    .info.notFreeAhandle = 1,
101
                    .info.refId = 0,
102
                    .info.noResp = 0,
103
                    .info.handle = 0};
104
  SEpSet  epset = {0};
386✔
105

106
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
386✔
107

108
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
386✔
109
  if (code != 0) {
386✔
110
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
111
  }
112
}
113

114

115

116
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
1,827,976✔
117
  int32_t code = 0;
1,827,976✔
118
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
1,827,976✔
119
  if (pMgmt->pData->timeWhiteVer == ver) {
1,827,976✔
120
    if (ver == 0) {
1,827,590✔
121
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
1,826,912✔
122
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
1,826,912✔
123
        dError("failed to disable time white list on dnode");
×
124
      }
125
    }
126
    return;
1,827,590✔
127
  }
128
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
386✔
129

130
  SRetrieveWhiteListReq req = {.ver = oldVer};
386✔
131
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
386✔
132
  if (contLen < 0) {
386✔
133
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
134
    return;
×
135
  }
136
  void *pHead = rpcMallocCont(contLen);
386✔
137
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
386✔
138
  if (contLen < 0) {
386✔
139
    rpcFreeCont(pHead);
×
140
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
141
    return;
×
142
  }
143

144
  SRpcMsg rpcMsg = {.pCont = pHead,
386✔
145
                    .contLen = contLen,
146
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
147
                    .info.ahandle = 0,
148
                    .info.notFreeAhandle = 1,
149
                    .info.refId = 0,
150
                    .info.noResp = 0,
151
                    .info.handle = 0};
152
  SEpSet  epset = {0};
386✔
153

154
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
386✔
155

156
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
386✔
157
  if (code != 0) {
386✔
158
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
159
  }
160
}
161

162

163

164
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
1,827,976✔
165
  int32_t code = 0;
1,827,976✔
166
  int64_t oldVer = taosAnalyGetVersion();
1,827,976✔
167
  if (oldVer == newVer) return;
1,827,976✔
168
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
169

170
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
171
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
172
  if (contLen < 0) {
×
173
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
174
    return;
×
175
  }
176

177
  void *pHead = rpcMallocCont(contLen);
×
178
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
179
  if (contLen < 0) {
×
180
    rpcFreeCont(pHead);
×
181
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
182
    return;
×
183
  }
184

185
  SRpcMsg rpcMsg = {
×
186
      .pCont = pHead,
187
      .contLen = contLen,
188
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
189
      .info.ahandle = 0,
190
      .info.refId = 0,
191
      .info.noResp = 0,
192
      .info.handle = 0,
193
  };
194
  SEpSet epset = {0};
×
195

196
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
197

198
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
199
  if (code != 0) {
×
200
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
201
  }
202
}
203

204
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
38,744,690✔
205
  const STraceId *trace = &pRsp->info.traceId;
38,744,690✔
206
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
38,744,690✔
207

208
  if (pRsp->code != 0) {
38,744,690✔
209
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
381,674✔
210
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
211
             pMgmt->statusSeq);
212
      pMgmt->pData->dropped = 1;
×
213
      if (dmWriteEps(pMgmt->pData) != 0) {
×
214
        dError("failed to write dnode file");
×
215
      }
216
      dInfo("dnode will exit since it is in the dropped state");
×
217
      (void)raise(SIGINT);
×
218
    }
219
  } else {
220
    SStatusRsp statusRsp = {0};
38,363,016✔
221
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
40,190,992✔
222
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
1,827,976✔
223
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
1,827,976✔
224
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
1,690,900✔
225
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
226
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
1,690,900✔
227
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
1,690,900✔
228
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
1,690,900✔
229
      }
230
      dGInfo("dnode:%d, set auditDB:%s, token:%s in status rsp received from mnode", pMgmt->pData->dnodeId,
1,827,976✔
231
             statusRsp.auditDB, statusRsp.auditToken);
232
      setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken);
1,827,976✔
233
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
1,827,976✔
234
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
1,827,976✔
235
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
1,827,976✔
236
    }
237
    tFreeSStatusRsp(&statusRsp);
38,363,016✔
238
  }
239
  rpcFreeCont(pRsp->pCont);
38,744,690✔
240
}
38,744,690✔
241

242
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
38,856,331✔
243
  int32_t    code = 0;
38,856,331✔
244
  SStatusReq req = {0};
38,856,331✔
245
  req.timestamp = taosGetTimestampMs();
38,856,331✔
246
  pMgmt->statusSeq++;
38,856,331✔
247

248
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
38,856,331✔
249
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
38,856,331✔
250
    dError("failed to lock status info lock");
×
251
    return;
×
252
  }
253

254
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
38,856,331✔
255
  req.sver = tsVersion;
38,856,331✔
256
  req.dnodeVer = tsDnodeData.dnodeVer;
38,856,331✔
257
  req.dnodeId = tsDnodeData.dnodeId;
38,856,331✔
258
  req.clusterId = tsDnodeData.clusterId;
38,856,331✔
259
  if (req.clusterId == 0) req.dnodeId = 0;
38,856,331✔
260
  req.rebootTime = tsDnodeData.rebootTime;
38,856,331✔
261
  req.updateTime = tsDnodeData.updateTime;
38,856,331✔
262
  req.numOfCores = tsNumOfCores;
38,856,331✔
263
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
38,856,331✔
264
  req.numOfDiskCfg = tsDiskCfgNum;
38,856,331✔
265
  req.memTotal = tsTotalMemoryKB * 1024;
38,856,331✔
266
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
38,856,331✔
267
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
38,856,331✔
268
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
38,856,331✔
269

270
  req.clusterCfg.statusInterval = tsStatusInterval;
38,856,331✔
271
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
38,856,331✔
272
  req.clusterCfg.checkTime = 0;
38,856,331✔
273
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
38,856,331✔
274
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
38,856,331✔
275
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
38,856,331✔
276
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
38,856,331✔
277
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
38,856,331✔
278
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
38,856,331✔
279
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
38,856,331✔
280
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
38,856,331✔
281
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
38,856,331✔
282
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
38,856,331✔
283
  char timestr[32] = "1970-01-01 00:00:00.00";
38,856,331✔
284
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
38,856,331✔
285
      0) {
286
    dError("failed to parse time since %s", tstrerror(code));
×
287
  }
288
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
38,856,331✔
289
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
38,856,331✔
290
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
38,856,331✔
291

292
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
38,856,331✔
293

294
  req.pVloads = tsVinfo.pVloads;
38,856,331✔
295
  tsVinfo.pVloads = NULL;
38,856,331✔
296

297
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
38,856,331✔
298
  req.mload = tsMLoad;
38,856,331✔
299

300
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
38,856,331✔
301
    dError("failed to unlock status info lock");
×
302
    return;
×
303
  }
304

305
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
38,856,331✔
306
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
38,856,331✔
307

308
  req.statusSeq = pMgmt->statusSeq;
38,856,331✔
309
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
38,856,331✔
310
  req.analVer = taosAnalyGetVersion();
38,856,331✔
311
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
38,856,331✔
312

313
  if (tsAuditUseToken) {
38,856,331✔
314
    getAuditDbNameToken(req.auditDB, req.auditToken);
38,856,331✔
315
  }
316

317
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
38,856,331✔
318
  if (contLen < 0) {
38,856,331✔
319
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
320
    return;
×
321
  }
322

323
  void *pHead = rpcMallocCont(contLen);
38,856,331✔
324
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
38,856,331✔
325
  if (contLen < 0) {
38,856,331✔
326
    rpcFreeCont(pHead);
×
327
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
328
    return;
×
329
  }
330
  tFreeSStatusReq(&req);
38,856,331✔
331

332
  SRpcMsg rpcMsg = {.pCont = pHead,
38,856,331✔
333
                    .contLen = contLen,
334
                    .msgType = TDMT_MND_STATUS,
335
                    .info.ahandle = 0,
336
                    .info.notFreeAhandle = 1,
337
                    .info.refId = 0,
338
                    .info.noResp = 0,
339
                    .info.handle = 0};
340
  SRpcMsg rpcRsp = {0};
38,856,331✔
341

342
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
38,856,331✔
343

344
  SEpSet epSet = {0};
38,856,331✔
345
  int8_t epUpdated = 0;
38,856,331✔
346
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
38,856,331✔
347

348
  if (dDebugFlag & DEBUG_TRACE) {
38,856,331✔
349
    char tbuf[512];
1,251,039✔
350
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
1,251,039✔
351
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
1,251,039✔
352
  }
353
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
38,856,331✔
354
  if (code != 0) {
38,856,331✔
355
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
111,641✔
356
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
111,641✔
357
      dmRotateMnodeEpSet(pMgmt->pData);
111,641✔
358
      char tbuf[512];
111,641✔
359
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
111,641✔
360
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
111,641✔
361
            tbuf, epSet.inUse);
362
    }
363
    return;
111,641✔
364
  }
365

366
  if (rpcRsp.code != 0) {
38,744,690✔
367
    dmRotateMnodeEpSet(pMgmt->pData);
381,674✔
368
    char tbuf[512];
381,674✔
369
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
381,674✔
370
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
381,674✔
371
          tbuf, epSet.inUse);
372
  } else {
373
    if (epUpdated == 1) {
38,363,016✔
374
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
91,496✔
375
    }
376
  }
377
  dmProcessStatusRsp(pMgmt, &rpcRsp);
38,744,690✔
378
}
379

380
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
531,469✔
381
  const STraceId *trace = &pRsp->info.traceId;
531,469✔
382
  int32_t         code = 0;
531,469✔
383
  SConfigRsp      configRsp = {0};
531,469✔
384
  bool            needStop = false;
531,469✔
385

386
  if (pRsp->code != 0) {
531,469✔
387
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
388
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
389
      pMgmt->pData->dropped = 1;
×
390
      if (dmWriteEps(pMgmt->pData) != 0) {
×
391
        dError("failed to write dnode file");
×
392
      }
393
      dInfo("dnode will exit since it is in the dropped state");
×
394
      (void)raise(SIGINT);
×
395
    }
396
  } else {
397
    bool needUpdate = false;
531,469✔
398
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,062,938✔
399
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
531,469✔
400
      // Try to use cfg from mnode sdb.
401
      if (!configRsp.isVersionVerified) {
531,469✔
402
        uInfo("config version not verified, update config");
410,141✔
403
        needUpdate = true;
410,141✔
404
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
410,141✔
405
        if (code != TSDB_CODE_SUCCESS) {
410,141✔
406
          dError("failed to persist global config since %s", tstrerror(code));
×
407
          goto _exit;
×
408
        }
409
      }
410
    }
411
    if (needUpdate) {
531,469✔
412
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
410,141✔
413
      if (code != TSDB_CODE_SUCCESS) {
410,141✔
414
        dError("failed to update config since %s", tstrerror(code));
×
415
        goto _exit;
×
416
      }
417
      code = setAllConfigs(tsCfg);
410,141✔
418
      if (code != TSDB_CODE_SUCCESS) {
410,141✔
419
        dError("failed to set all configs since %s", tstrerror(code));
×
420
        goto _exit;
×
421
      }
422
    }
423
    code = taosPersistLocalConfig(pMgmt->path);
531,469✔
424
    if (code != TSDB_CODE_SUCCESS) {
531,469✔
425
      dError("failed to persist local config since %s", tstrerror(code));
×
426
    }
427
    tsConfigInited = 1;
531,469✔
428
  }
429
_exit:
531,469✔
430
  tFreeSConfigRsp(&configRsp);
531,469✔
431
  rpcFreeCont(pRsp->pCont);
531,469✔
432
  if (needStop) {
531,469✔
433
    dmStop();
×
434
  }
435
}
531,469✔
436

437
int32_t dmProcessKeySyncRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
502,978✔
438
  const STraceId *trace = &pRsp->info.traceId;
502,978✔
439
  int32_t         code = 0;
502,978✔
440
  SKeySyncRsp     keySyncRsp = {0};
502,978✔
441

442
  if (pRsp->code != 0) {
502,978✔
443
    dError("failed to sync keys from mnode since %s", tstrerror(pRsp->code));
×
444
    code = pRsp->code;
×
445
    goto _exit;
×
446
  }
447

448
  if (pRsp->pCont == NULL || pRsp->contLen <= 0) {
502,978✔
449
    dError("invalid key sync response, empty content");
×
450
    code = TSDB_CODE_INVALID_MSG;
×
451
    goto _exit;
×
452
  }
453

454
  code = tDeserializeSKeySyncRsp(pRsp->pCont, pRsp->contLen, &keySyncRsp);
502,978✔
455
  if (code != 0) {
502,978✔
456
    dError("failed to deserialize key sync response since %s", tstrerror(code));
×
457
    goto _exit;
×
458
  }
459

460
  dInfo("received key sync response, mnode keyVersion:%d, local keyVersion:%d, needUpdate:%d", keySyncRsp.keyVersion,
502,978✔
461
        tsLocalKeyVersion, keySyncRsp.needUpdate);
462
  tsEncryptKeysStatus = keySyncRsp.encryptionKeyStatus;
502,978✔
463
  if (keySyncRsp.needUpdate) {
502,978✔
464
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
465
    // Get encrypt file path from tsDataDir
466
    char masterKeyFile[PATH_MAX] = {0};
×
467
    char derivedKeyFile[PATH_MAX] = {0};
×
468
    snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
×
469
             TD_DIRSEP);
470
    snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
×
471
             TD_DIRSEP);
472

473
    dInfo("updating local encryption keys from mnode, key file is saved in %s and %s, keyVersion:%d -> %d",
×
474
          masterKeyFile, derivedKeyFile, tsLocalKeyVersion, keySyncRsp.keyVersion);
475

476
    // Save keys to master.bin and derived.bin
477
    // Use the same algorithm for cfg and meta keys (backward compatible)
478
    code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, keySyncRsp.svrKey, keySyncRsp.dbKey, keySyncRsp.cfgKey, keySyncRsp.metaKey,
×
479
                                keySyncRsp.dataKey, keySyncRsp.algorithm, keySyncRsp.algorithm, keySyncRsp.algorithm,
480
                                keySyncRsp.keyVersion, keySyncRsp.createTime,
481
                                keySyncRsp.svrKeyUpdateTime, keySyncRsp.dbKeyUpdateTime);
482
    if (code != 0) {
×
483
      dError("failed to save encryption keys since %s", tstrerror(code));
×
484
      goto _exit;
×
485
    }
486

487
    // Update global variables with synced keys
488
    tstrncpy(tsSvrKey, keySyncRsp.svrKey, sizeof(tsSvrKey));
×
489
    tstrncpy(tsDbKey, keySyncRsp.dbKey, sizeof(tsDbKey));
×
490
    tstrncpy(tsCfgKey, keySyncRsp.cfgKey, sizeof(tsCfgKey));
×
491
    tstrncpy(tsMetaKey, keySyncRsp.metaKey, sizeof(tsMetaKey));
×
492
    tstrncpy(tsDataKey, keySyncRsp.dataKey, sizeof(tsDataKey));
×
493
    tsEncryptAlgorithmType = keySyncRsp.algorithm;
×
494
    tsEncryptKeyVersion = keySyncRsp.keyVersion;
×
495
    tsEncryptKeyCreateTime = keySyncRsp.createTime;
×
496
    tsSvrKeyUpdateTime = keySyncRsp.svrKeyUpdateTime;
×
497
    tsDbKeyUpdateTime = keySyncRsp.dbKeyUpdateTime;
×
498

499
    // Update local key version
500
    tsLocalKeyVersion = keySyncRsp.keyVersion;
×
501
    dInfo("successfully updated local encryption keys to version:%d", tsLocalKeyVersion);
×
502

503
    // Encrypt existing plaintext config files
504
    code = taosEncryptExistingCfgFiles(tsDataDir);
×
505
    if (code != 0) {
×
506
      dWarn("failed to encrypt existing config files since %s, will retry on next write", tstrerror(code));
×
507
      // Don't fail the key sync, files will be encrypted on next write
508
      code = 0;
×
509
    }
510
#else
511
    dWarn("enterprise features not enabled, skipping key sync");
512
#endif
513
  } else {
514
    dDebug("local keys are up to date, version:%d", tsLocalKeyVersion);
502,978✔
515
  }
516
  
517
  code = TSDB_CODE_SUCCESS;
502,978✔
518

519
_exit:
502,978✔
520
  rpcFreeCont(pRsp->pCont);
502,978✔
521
  return code;
502,978✔
522
}
523

524
void dmSendKeySyncReq(SDnodeMgmt *pMgmt) {
531,900✔
525
  int32_t     code = 0;
531,900✔
526
  SKeySyncReq req = {0};
531,900✔
527

528
  req.dnodeId = pMgmt->pData->dnodeId;
531,900✔
529
  req.keyVersion = tsLocalKeyVersion;
531,900✔
530
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d", req.dnodeId, req.keyVersion);
531,900✔
531

532
  int32_t contLen = tSerializeSKeySyncReq(NULL, 0, &req);
531,900✔
533
  if (contLen < 0) {
531,900✔
534
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
UNCOV
535
    return;
×
536
  }
537

538
  void *pHead = rpcMallocCont(contLen);
531,900✔
539
  if (pHead == NULL) {
531,900✔
540
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
541
    return;
×
542
  }
543
  contLen = tSerializeSKeySyncReq(pHead, contLen, &req);
531,900✔
544
  if (contLen < 0) {
531,900✔
545
    rpcFreeCont(pHead);
×
546
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
547
    return;
×
548
  }
549

550
  SRpcMsg rpcMsg = {.pCont = pHead,
531,900✔
551
                    .contLen = contLen,
552
                    .msgType = TDMT_MND_KEY_SYNC,
553
                    .info.ahandle = 0,
554
                    .info.notFreeAhandle = 1,
555
                    .info.refId = 0,
556
                    .info.noResp = 0,
557
                    .info.handle = 0};
558
  SRpcMsg rpcRsp = {0};
531,900✔
559

560
  SEpSet epSet = {0};
531,900✔
561
  int8_t epUpdated = 0;
531,900✔
562
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
531,900✔
563

564
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d, begin to send rpc msg", req.dnodeId, req.keyVersion);
531,900✔
565
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
531,900✔
566
  if (code != 0) {
531,900✔
567
    dError("failed to SendRecv key sync req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
28,922✔
568
    return;
28,922✔
569
  }
570
  if (rpcRsp.code != 0) {
502,978✔
571
    dError("failed to send key sync req since %s", tstrerror(rpcRsp.code));
×
572
    return;
×
573
  }
574
  code = dmProcessKeySyncRsp(pMgmt, &rpcRsp);
502,978✔
575
  if (code != 0) {
502,978✔
576
    dError("failed to process key sync rsp since %s", tstrerror(code));
×
577
    return;
×
578
  }
579
}
580

581
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
573,828✔
582
  int32_t    code = 0;
573,828✔
583
  SConfigReq req = {0};
573,828✔
584

585
  req.cver = tsdmConfigVersion;
573,828✔
586
  req.forceReadConfig = tsForceReadConfig;
573,828✔
587
  req.array = taosGetGlobalCfg(tsCfg);
573,828✔
588
  dDebug("send config req to mnode, configVersion:%d", req.cver);
573,828✔
589

590
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
573,828✔
591
  if (contLen < 0) {
573,828✔
592
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
593
    return;
×
594
  }
595

596
  void *pHead = rpcMallocCont(contLen);
573,828✔
597
  if (pHead == NULL) {
573,828✔
598
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
599
    return;
×
600
  }
601
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
573,828✔
602
  if (contLen < 0) {
573,828✔
603
    rpcFreeCont(pHead);
×
604
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
605
    return;
×
606
  }
607

608
  SRpcMsg rpcMsg = {.pCont = pHead,
573,828✔
609
                    .contLen = contLen,
610
                    .msgType = TDMT_MND_CONFIG,
611
                    .info.ahandle = 0,
612
                    .info.notFreeAhandle = 1,
613
                    .info.refId = 0,
614
                    .info.noResp = 0,
615
                    .info.handle = 0};
616
  SRpcMsg rpcRsp = {0};
573,828✔
617

618
  SEpSet epSet = {0};
573,828✔
619
  int8_t epUpdated = 0;
573,828✔
620
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
573,828✔
621

622
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
573,828✔
623
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
573,828✔
624
  if (code != 0) {
573,828✔
625
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
42,359✔
626
    return;
42,359✔
627
  }
628
  if (rpcRsp.code != 0) {
531,469✔
629
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
630
    return;
×
631
  }
632
  dmProcessConfigRsp(pMgmt, &rpcRsp);
531,469✔
633
}
634

635
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
39,470,746✔
636
  dDebug("begin to get dnode info");
39,470,746✔
637
  SDnodeData dnodeData = {0};
39,470,746✔
638
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
39,470,746✔
639
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
39,470,746✔
640
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
39,470,746✔
641
  dnodeData.clusterId = pMgmt->pData->clusterId;
39,470,746✔
642
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
39,470,746✔
643
  dnodeData.updateTime = pMgmt->pData->updateTime;
39,470,746✔
644
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
39,470,746✔
645
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
39,470,746✔
646

647
  dDebug("begin to get vnode loads");
39,470,746✔
648
  SMonVloadInfo vinfo = {0};
39,470,746✔
649
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
39,470,746✔
650

651
  dDebug("begin to get mnode loads");
39,470,746✔
652
  SMonMloadInfo minfo = {0};
39,470,746✔
653
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
39,470,746✔
654

655
  dDebug("begin to lock status info");
39,470,746✔
656
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
39,470,746✔
657
    dError("failed to lock status info lock");
×
658
    return;
×
659
  }
660
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
39,470,746✔
661
  tsDnodeData.dnodeId = dnodeData.dnodeId;
39,470,746✔
662
  tsDnodeData.clusterId = dnodeData.clusterId;
39,470,746✔
663
  tsDnodeData.rebootTime = dnodeData.rebootTime;
39,470,746✔
664
  tsDnodeData.updateTime = dnodeData.updateTime;
39,470,746✔
665
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
39,470,746✔
666

667
  if (tsVinfo.pVloads == NULL) {
39,470,746✔
668
    tsVinfo.pVloads = vinfo.pVloads;
38,418,214✔
669
    vinfo.pVloads = NULL;
38,418,214✔
670
  } else {
671
    taosArrayDestroy(vinfo.pVloads);
1,052,532✔
672
    vinfo.pVloads = NULL;
1,052,532✔
673
  }
674

675
  tsMLoad = minfo.load;
39,470,746✔
676

677
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
39,470,746✔
678
    dError("failed to unlock status info lock");
×
679
    return;
×
680
  }
681
}
682

683
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
684
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
685
  if (contLen < 0) {
×
686
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
687
    return;
×
688
  }
689
  void *pHead = rpcMallocCont(contLen);
×
690
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
691
  if (contLen < 0) {
×
692
    rpcFreeCont(pHead);
×
693
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
694
    return;
×
695
  }
696

697
  SRpcMsg rpcMsg = {.pCont = pHead,
×
698
                    .contLen = contLen,
699
                    .msgType = TDMT_MND_NOTIFY,
700
                    .info.ahandle = 0,
701
                    .info.notFreeAhandle = 1,
702
                    .info.refId = 0,
703
                    .info.noResp = 1,
704
                    .info.handle = 0};
705

706
  SEpSet epSet = {0};
×
707
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
708
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
709
    dError("failed to send notify req");
×
710
  }
711
}
712

713
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
714
  dError("auth rsp is received, but not supported yet");
×
715
  return 0;
×
716
}
717

718
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
719
  dError("grant rsp is received, but not supported yet");
×
720
  return 0;
×
721
}
722

723
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
61,623✔
724
  int32_t       code = 0;
61,623✔
725
  SDCfgDnodeReq cfgReq = {0};
61,623✔
726
  SConfig      *pCfg = taosGetCfg();
61,623✔
727
  SConfigItem  *pItem = NULL;
61,623✔
728

729
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
61,623✔
730
    return TSDB_CODE_INVALID_MSG;
×
731
  }
732
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
61,623✔
733
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
3,165✔
734
  }
735

736
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
58,458✔
737

738
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
58,458✔
739
  if (code != 0) {
58,458✔
740
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
181✔
741
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
181✔
742
      return TSDB_CODE_SUCCESS;
181✔
743
    } else {
744
      return code;
×
745
    }
746
  }
747
  if (pItem == NULL) {
58,277✔
748
    return TSDB_CODE_CFG_NOT_FOUND;
×
749
  }
750

751
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
58,277✔
752
    char value[10] = {0};
×
753
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
754
      tsSyncTimeout = 0;
×
755
    }
756

757
    if (tsSyncTimeout > 0) {
×
758
      SConfigItem *pItemTmp = NULL;
×
759
      char         tmp[10] = {0};
×
760

761
      sprintf(tmp, "%d", tsSyncTimeout);
×
762
      TAOS_CHECK_RETURN(
×
763
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
764
      if (pItemTmp == NULL) {
×
765
        return TSDB_CODE_CFG_NOT_FOUND;
×
766
      }
767

768
      sprintf(tmp, "%d", tsSyncTimeout / 4);
×
769
      TAOS_CHECK_RETURN(
×
770
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
771
      if (pItemTmp == NULL) {
×
772
        return TSDB_CODE_CFG_NOT_FOUND;
×
773
      }
774
      TAOS_CHECK_RETURN(
×
775
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
776
      if (pItemTmp == NULL) {
×
777
        return TSDB_CODE_CFG_NOT_FOUND;
×
778
      }
779

780
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
781
      TAOS_CHECK_RETURN(
×
782
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
783
      if (pItemTmp == NULL) {
×
784
        return TSDB_CODE_CFG_NOT_FOUND;
×
785
      }
786
      TAOS_CHECK_RETURN(
×
787
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
788
      if (pItemTmp == NULL) {
×
789
        return TSDB_CODE_CFG_NOT_FOUND;
×
790
      }
791
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
792
      if (pItemTmp == NULL) {
×
793
        return TSDB_CODE_CFG_NOT_FOUND;
×
794
      }
795

796
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
797
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
798
      if (pItemTmp == NULL) {
×
799
        return TSDB_CODE_CFG_NOT_FOUND;
×
800
      }
801

802
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
803
      TAOS_CHECK_RETURN(
×
804
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
805
      if (pItemTmp == NULL) {
×
806
        return TSDB_CODE_CFG_NOT_FOUND;
×
807
      }
808
      TAOS_CHECK_RETURN(
×
809
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
810
      if (pItemTmp == NULL) {
×
811
        return TSDB_CODE_CFG_NOT_FOUND;
×
812
      }
813
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
814
      if (pItemTmp == NULL) {
×
815
        return TSDB_CODE_CFG_NOT_FOUND;
×
816
      }
817

818
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
819
            tsSyncTimeout);
820
    }
821
  }
822

823
  if (!isConifgItemLazyMode(pItem)) {
58,277✔
824
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
57,697✔
825

826
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
57,697✔
827
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
828
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
829
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
830

831
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
832
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
833
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
834
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
835

836
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
837
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
838
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
839

840
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
841
            tsSyncTimeout);
842
    }
843
  }
844

845
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
58,277✔
846
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
9,390✔
847
    if (code != TSDB_CODE_SUCCESS) {
9,390✔
848
      dError("failed to persist global config since %s", tstrerror(code));
×
849
    }
850
  } else {
851
    code = taosPersistLocalConfig(pMgmt->path);
48,887✔
852
    if (code != TSDB_CODE_SUCCESS) {
48,887✔
853
      dError("failed to persist local config since %s", tstrerror(code));
×
854
    }
855
  }
856

857
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
58,277✔
858
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
859

860
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
861
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
862
  }
863

864
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
58,277✔
865
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
58,277✔
866
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
867
  }
868

869
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
58,277✔
870
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
58,277✔
871
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
872
  }
873

874
  if (cfgReq.version > 0) {
58,277✔
875
    tsdmConfigVersion = cfgReq.version;
15,046✔
876
  }
877
  return code;
58,277✔
878
}
879

880
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
145✔
881
#ifdef TD_ENTERPRISE
882
  int32_t       code = 0;
145✔
883
  SDCfgDnodeReq cfgReq = {0};
145✔
884
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
145✔
885
    code = TSDB_CODE_INVALID_MSG;
×
886
    goto _exit;
×
887
  }
888

889
  code = dmUpdateEncryptKey(cfgReq.value, true);
145✔
890
  if (code == 0) {
145✔
891
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
145✔
892
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
145✔
893
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
145✔
894
  }
895

896
_exit:
145✔
897
  pMsg->code = code;
145✔
898
  pMsg->info.rsp = NULL;
145✔
899
  pMsg->info.rspLen = 0;
145✔
900
  return code;
145✔
901
#else
902
  return 0;
903
#endif
904
}
905

906
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
907
// Verification plaintext used to validate encryption keys
908
#define KEY_VERIFY_PLAINTEXT "TDengine_Encryption_Key_Verification_v1.0"
909

910
// Save key verification file with encrypted plaintext for each key
911
static int32_t dmSaveKeyVerification(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
×
912
                                     const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
913
                                     int32_t metaAlgorithm) {
914
  char    verifyFile[PATH_MAX] = {0};
×
915
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
×
916
                            TD_DIRSEP, TD_DIRSEP);
917
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
×
918
    dError("failed to build key verification file path");
×
919
    return TSDB_CODE_OUT_OF_BUFFER;
×
920
  }
921

922
  int32_t     code = 0;
×
923
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
×
924
  int32_t     plaintextLen = strlen(plaintext);
×
925

926
  // Array of keys and their algorithms
927
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
×
928
  int32_t     algorithms[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
×
929
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
×
930

931
  // Calculate total buffer size
932
  int32_t encryptedLen = ((plaintextLen + 15) / 16) * 16;                 // Padded length for CBC
×
933
  int32_t headerSize = sizeof(uint32_t) + sizeof(uint16_t);               // magic + version
×
934
  int32_t perKeySize = sizeof(int32_t) + sizeof(int32_t) + encryptedLen;  // algo + len + encrypted data
×
935
  int32_t totalSize = headerSize + perKeySize * 5;
×
936

937
  // Allocate buffer for all data
938
  char *buffer = taosMemoryMalloc(totalSize);
×
939
  if (buffer == NULL) {
×
940
    dError("failed to allocate memory for key verification buffer");
×
941
    return terrno;
×
942
  }
943

944
  char *ptr = buffer;
×
945

946
  // Write magic number and version to buffer
947
  uint32_t magic = 0x544B5659;  // "TKVY" in hex
×
948
  uint16_t version = 1;
×
949
  memcpy(ptr, &magic, sizeof(magic));
×
950
  ptr += sizeof(magic);
×
951
  memcpy(ptr, &version, sizeof(version));
×
952
  ptr += sizeof(version);
×
953

954
  // Encrypt all keys and write to buffer
955
  char paddedPlaintext[512] = {0};
×
956
  memcpy(paddedPlaintext, plaintext, plaintextLen);
×
957

958
  for (int i = 0; i < 5; i++) {
×
959
    char encrypted[512] = {0};
×
960

961
    // Encrypt the verification plaintext with this key using CBC
962
    SCryptOpts opts = {0};
×
963
    opts.len = encryptedLen;
×
964
    opts.source = paddedPlaintext;
×
965
    opts.result = encrypted;
×
966
    opts.unitLen = 16;
×
967
    opts.pOsslAlgrName =
×
968
        (algorithms[i] == TSDB_ENCRYPT_ALGO_SM4) ? TSDB_ENCRYPT_ALGO_SM4_STR : TSDB_ENCRYPT_ALGO_NONE_STR;
×
969
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
×
970

971
    int32_t count = CBC_Encrypt(&opts);
×
972
    if (count != opts.len) {
×
973
      code = terrno ? terrno : TSDB_CODE_FAILED;
×
974
      dError("failed to encrypt verification for %s, count=%d, expected=%d, since %s", keyNames[i], count, opts.len,
×
975
             tstrerror(code));
976
      taosMemoryFree(buffer);
×
977
      return code;
×
978
    }
979

980
    // Write to buffer: algorithm + encrypted length + encrypted data
981
    memcpy(ptr, &algorithms[i], sizeof(int32_t));
×
982
    ptr += sizeof(int32_t);
×
983
    memcpy(ptr, &encryptedLen, sizeof(int32_t));
×
984
    ptr += sizeof(int32_t);
×
985
    memcpy(ptr, encrypted, encryptedLen);
×
986
    ptr += encryptedLen;
×
987

988
    dDebug("prepared verification for %s: algorithm=%d, encLen=%d", keyNames[i], algorithms[i], encryptedLen);
×
989
  }
990

991
  // Write all data to file in one operation
992
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
993
  if (pFile == NULL) {
×
994
    dError("failed to create key verification file:%s, errno:%d", verifyFile, errno);
×
995
    taosMemoryFree(buffer);
×
996
    return TSDB_CODE_FILE_CORRUPTED;
×
997
  }
998

999
  int64_t written = taosWriteFile(pFile, buffer, totalSize);
×
1000
  (void)taosCloseFile(&pFile);
×
1001
  taosMemoryFree(buffer);
×
1002

1003
  if (written != totalSize) {
×
1004
    dError("failed to write key verification file, written=%" PRId64 ", expected=%d", written, totalSize);
×
1005
    return TSDB_CODE_FILE_CORRUPTED;
×
1006
  }
1007

1008
  dInfo("successfully saved key verification file:%s, size=%d", verifyFile, totalSize);
×
1009
  return 0;
×
1010
}
1011

1012
// Verify all encryption keys by decrypting and comparing with original plaintext
1013
static int32_t dmVerifyEncryptionKeys(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
×
1014
                                      const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
1015
                                      int32_t metaAlgorithm) {
1016
  char    verifyFile[PATH_MAX] = {0};
×
1017
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
×
1018
                            TD_DIRSEP, TD_DIRSEP);
1019
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
×
1020
    dError("failed to build key verification file path");
×
1021
    return TSDB_CODE_OUT_OF_BUFFER;
×
1022
  }
1023

1024
  // Get file size
1025
  int64_t fileSize = 0;
×
1026
  if (taosStatFile(verifyFile, &fileSize, NULL, NULL) < 0) {
×
1027
    // File doesn't exist, create it with current keys
1028
    dInfo("key verification file not found, creating new one");
×
1029
    return dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
×
1030
  }
1031

1032
  if (fileSize <= 0 || fileSize > 10240) {  // Max 10KB
×
1033
    dError("invalid key verification file size: %" PRId64, fileSize);
×
1034
    return TSDB_CODE_FILE_CORRUPTED;
×
1035
  }
1036

1037
  // Allocate buffer and read entire file
1038
  char *buffer = taosMemoryMalloc(fileSize);
×
1039
  if (buffer == NULL) {
×
1040
    dError("failed to allocate memory for key verification buffer");
×
1041
    return terrno;
×
1042
  }
1043

1044
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_READ);
×
1045
  if (pFile == NULL) {
×
1046
    dError("failed to open key verification file:%s", verifyFile);
×
1047
    taosMemoryFree(buffer);
×
1048
    return TSDB_CODE_FILE_CORRUPTED;
×
1049
  }
1050

1051
  int64_t bytesRead = taosReadFile(pFile, buffer, fileSize);
×
1052
  (void)taosCloseFile(&pFile);
×
1053

1054
  if (bytesRead != fileSize) {
×
1055
    dError("failed to read key verification file, read=%" PRId64 ", expected=%" PRId64, bytesRead, fileSize);
×
1056
    taosMemoryFree(buffer);
×
1057
    return TSDB_CODE_FILE_CORRUPTED;
×
1058
  }
1059

1060
  int32_t     code = 0;
×
1061
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
×
1062
  int32_t     plaintextLen = strlen(plaintext);
×
1063
  const char *ptr = buffer;
×
1064

1065
  // Parse and verify header
1066
  uint32_t magic = 0;
×
1067
  uint16_t version = 0;
×
1068
  memcpy(&magic, ptr, sizeof(magic));
×
1069
  ptr += sizeof(magic);
×
1070
  memcpy(&version, ptr, sizeof(version));
×
1071
  ptr += sizeof(version);
×
1072

1073
  if (magic != 0x544B5659) {
×
1074
    dError("invalid magic number in key verification file: 0x%x", magic);
×
1075
    taosMemoryFree(buffer);
×
1076
    return TSDB_CODE_FILE_CORRUPTED;
×
1077
  }
1078

1079
  // Array of keys and their algorithms
1080
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
×
1081
  int32_t     expectedAlgos[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
×
1082
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
×
1083

1084
  // Verify each key from buffer
1085
  for (int i = 0; i < 5; i++) {
×
1086
    // Check if we have enough data remaining
1087
    if (ptr - buffer + sizeof(int32_t) * 2 > fileSize) {
×
1088
      dError("unexpected end of file while reading %s metadata", keyNames[i]);
×
1089
      taosMemoryFree(buffer);
×
1090
      return TSDB_CODE_FILE_CORRUPTED;
×
1091
    }
1092

1093
    int32_t savedAlgo = 0;
×
1094
    int32_t encryptedLen = 0;
×
1095

1096
    memcpy(&savedAlgo, ptr, sizeof(int32_t));
×
1097
    ptr += sizeof(int32_t);
×
1098
    memcpy(&encryptedLen, ptr, sizeof(int32_t));
×
1099
    ptr += sizeof(int32_t);
×
1100

1101
    if (encryptedLen <= 0 || encryptedLen > 512) {
×
1102
      dError("invalid encrypted length %d for %s", encryptedLen, keyNames[i]);
×
1103
      taosMemoryFree(buffer);
×
1104
      return TSDB_CODE_FILE_CORRUPTED;
×
1105
    }
1106

1107
    if (ptr - buffer + encryptedLen > fileSize) {
×
1108
      dError("unexpected end of file while reading %s encrypted data", keyNames[i]);
×
1109
      taosMemoryFree(buffer);
×
1110
      return TSDB_CODE_FILE_CORRUPTED;
×
1111
    }
1112

1113
    uint8_t encrypted[512] = {0};
×
1114
    memcpy(encrypted, ptr, encryptedLen);
×
1115
    ptr += encryptedLen;
×
1116

1117
    // Decrypt with current key using CBC
1118
    char decrypted[512] = {0};
×
1119

1120
    SCryptOpts opts = {0};
×
1121
    opts.len = encryptedLen;
×
1122
    opts.source = (char *)encrypted;
×
1123
    opts.result = decrypted;
×
1124
    opts.unitLen = 16;
×
1125
    opts.pOsslAlgrName = (savedAlgo == TSDB_ENCRYPT_ALGO_SM4) ? TSDB_ENCRYPT_ALGO_SM4_STR : TSDB_ENCRYPT_ALGO_NONE_STR;
×
1126
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
×
1127

1128
    int32_t count = CBC_Decrypt(&opts);
×
1129
    if (count != opts.len) {
×
1130
      code = terrno ? terrno : TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1131
      dError("failed to decrypt verification for %s, count=%d, expected=%d, since %s - KEY IS INCORRECT", keyNames[i],
×
1132
             count, opts.len, tstrerror(code));
1133
      taosMemoryFree(buffer);
×
1134
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1135
    }
1136

1137
    // Verify decrypted data matches original plaintext (compare only the plaintext length)
1138
    if (memcmp(decrypted, plaintext, plaintextLen) != 0) {
×
1139
      dError("%s verification FAILED: decrypted text does not match - KEY IS INCORRECT", keyNames[i]);
×
1140
      taosMemoryFree(buffer);
×
1141
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1142
    }
1143

1144
    dInfo("%s verification passed (algorithm=%d)", keyNames[i], savedAlgo);
×
1145
  }
1146

1147
  taosMemoryFree(buffer);
×
1148
  dInfo("all encryption keys verified successfully");
×
1149
  return 0;
×
1150
}
1151

1152
// Public API: Verify and initialize encryption keys at startup
1153
int32_t dmVerifyAndInitEncryptionKeys(void) {
533,319✔
1154
  // Skip verification in dump sdb mode (taosd -s)
1155
  if (tsSkipKeyCheckMode) {
533,319✔
1156
    dInfo("skip encryption key verification in some special check mode");
×
1157
    return 0;
×
1158
  }
1159

1160
  // Check if encryption keys are loaded
1161
  if (tsEncryptKeysStatus != TSDB_ENCRYPT_KEY_STAT_LOADED) {
533,319✔
1162
    dDebug("encryption keys not loaded, skipping verification");
533,319✔
1163
    return 0;
533,319✔
1164
  }
1165

1166
  // Get key file paths
1167
  char    masterKeyFile[PATH_MAX] = {0};
×
1168
  char    derivedKeyFile[PATH_MAX] = {0};
×
1169
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
1170
                            TD_DIRSEP, TD_DIRSEP);
1171
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
×
1172
    dError("failed to build master key file path");
×
1173
    return TSDB_CODE_OUT_OF_BUFFER;
×
1174
  }
1175

1176
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
1177
                    TD_DIRSEP, TD_DIRSEP);
1178
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
×
1179
    dError("failed to build derived key file path");
×
1180
    return TSDB_CODE_OUT_OF_BUFFER;
×
1181
  }
1182

1183
  // Load encryption keys
1184
  char    svrKey[129] = {0};
×
1185
  char    dbKey[129] = {0};
×
1186
  char    cfgKey[129] = {0};
×
1187
  char    metaKey[129] = {0};
×
1188
  char    dataKey[129] = {0};
×
1189
  int32_t algorithm = 0;
×
1190
  int32_t cfgAlgorithm = 0;
×
1191
  int32_t metaAlgorithm = 0;
×
1192
  int32_t fileVersion = 0;
×
1193
  int32_t keyVersion = 0;
×
1194
  int64_t createTime = 0;
×
1195
  int64_t svrKeyUpdateTime = 0;
×
1196
  int64_t dbKeyUpdateTime = 0;
×
1197

1198
  int32_t code = taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey,
×
1199
                                      &algorithm, &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime,
1200
                                      &svrKeyUpdateTime, &dbKeyUpdateTime);
1201
  if (code != 0) {
×
1202
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1203
    return code;
×
1204
  }
1205

1206
  // Verify all keys
1207
  code = dmVerifyEncryptionKeys(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
×
1208
  if (code != 0) {
×
1209
    dError("encryption key verification failed, since %s", tstrerror(code));
×
1210
    return code;
×
1211
  }
1212

1213
  dInfo("encryption keys verified and initialized successfully");
×
1214
  return 0;
×
1215
}
1216
#else
1217
int32_t dmVerifyAndInitEncryptionKeys(void) {
1218
  // Community edition or no TaosK support
1219
  return 0;
1220
}
1221
#endif
1222

1223
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1224
static int32_t dmUpdateSvrKey(const char *newKey) {
×
1225
  if (newKey == NULL || newKey[0] == '\0') {
×
1226
    dError("invalid new SVR_KEY, key is empty");
×
1227
    return TSDB_CODE_INVALID_PARA;
×
1228
  }
1229

1230
  char masterKeyFile[PATH_MAX] = {0};
×
1231
  char derivedKeyFile[PATH_MAX] = {0};
×
1232

1233
  // Build path to key files
1234
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
1235
                            TD_DIRSEP, TD_DIRSEP);
1236
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
×
1237
    dError("failed to build master key file path");
×
1238
    return TSDB_CODE_OUT_OF_BUFFER;
×
1239
  }
1240

1241
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
1242
                    TD_DIRSEP, TD_DIRSEP);
1243
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
×
1244
    dError("failed to build derived key file path");
×
1245
    return TSDB_CODE_OUT_OF_BUFFER;
×
1246
  }
1247

1248
  // Load current keys
1249
  char    svrKey[129] = {0};
×
1250
  char    dbKey[129] = {0};
×
1251
  char    cfgKey[129] = {0};
×
1252
  char    metaKey[129] = {0};
×
1253
  char    dataKey[129] = {0};
×
1254
  int32_t algorithm = 0;
×
1255
  int32_t cfgAlgorithm = 0;
×
1256
  int32_t metaAlgorithm = 0;
×
1257
  int32_t fileVersion = 0;
×
1258
  int32_t keyVersion = 0;
×
1259
  int64_t createTime = 0;
×
1260
  int64_t svrKeyUpdateTime = 0;
×
1261
  int64_t dbKeyUpdateTime = 0;
×
1262

1263
  int32_t code =
1264
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
×
1265
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1266
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1267
  if (code != 0) {
×
1268
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1269
    return code;
×
1270
  }
1271

1272
  // Update SVR_KEY
1273
  int64_t now = taosGetTimestampMs();
×
1274
  int32_t newKeyVersion = keyVersion + 1;
×
1275

1276
  dInfo("updating SVR_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
×
1277
  tstrncpy(svrKey, newKey, sizeof(svrKey));
×
1278
  svrKeyUpdateTime = now;
×
1279

1280
  // Save updated keys (use algorithm for all keys for backward compatibility)
1281
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
×
1282
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1283
  if (code != 0) {
×
1284
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1285
    return code;
×
1286
  }
1287

1288
  // Update key verification file with new SVR_KEY
1289
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
×
1290
  if (code != 0) {
×
1291
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1292
    // Don't fail the operation if verification file update fails
1293
  }
1294

1295
  // Update global variables
1296
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
×
1297
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
×
1298
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
×
1299
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
×
1300
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
×
1301
  tsEncryptAlgorithmType = algorithm;
×
1302
  tsEncryptFileVersion = fileVersion;
×
1303
  tsEncryptKeyVersion = newKeyVersion;
×
1304
  tsEncryptKeyCreateTime = createTime;
×
1305
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
×
1306
  tsDbKeyUpdateTime = dbKeyUpdateTime;
×
1307

1308
  // Update encryption key status for backward compatibility
1309
  int keyLen = strlen(tsDataKey);
×
1310
  if (keyLen > ENCRYPT_KEY_LEN) {
×
1311
    keyLen = ENCRYPT_KEY_LEN;
×
1312
  }
1313
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
×
1314
  memcpy(tsEncryptKey, tsDataKey, keyLen);
×
1315
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
×
1316
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
×
1317

1318
  dInfo("successfully updated SVR_KEY to version:%d", newKeyVersion);
×
1319
  return 0;
×
1320
}
1321

1322
static int32_t dmUpdateDbKey(const char *newKey) {
×
1323
  if (newKey == NULL || newKey[0] == '\0') {
×
1324
    dError("invalid new DB_KEY, key is empty");
×
1325
    return TSDB_CODE_INVALID_PARA;
×
1326
  }
1327

1328
  char masterKeyFile[PATH_MAX] = {0};
×
1329
  char derivedKeyFile[PATH_MAX] = {0};
×
1330

1331
  // Build path to key files
1332
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
1333
                            TD_DIRSEP, TD_DIRSEP);
1334
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
×
1335
    dError("failed to build master key file path");
×
1336
    return TSDB_CODE_OUT_OF_BUFFER;
×
1337
  }
1338

1339
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
1340
                    TD_DIRSEP, TD_DIRSEP);
1341
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
×
1342
    dError("failed to build derived key file path");
×
1343
    return TSDB_CODE_OUT_OF_BUFFER;
×
1344
  }
1345

1346
  // Load current keys
1347
  char    svrKey[129] = {0};
×
1348
  char    dbKey[129] = {0};
×
1349
  char    cfgKey[129] = {0};
×
1350
  char    metaKey[129] = {0};
×
1351
  char    dataKey[129] = {0};
×
1352
  int32_t algorithm = 0;
×
1353
  int32_t cfgAlgorithm = 0;
×
1354
  int32_t metaAlgorithm = 0;
×
1355
  int32_t fileVersion = 0;
×
1356
  int32_t keyVersion = 0;
×
1357
  int64_t createTime = 0;
×
1358
  int64_t svrKeyUpdateTime = 0;
×
1359
  int64_t dbKeyUpdateTime = 0;
×
1360

1361
  int32_t code =
1362
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
×
1363
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1364
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1365
  if (code != 0) {
×
1366
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1367
    return code;
×
1368
  }
1369

1370
  // Update DB_KEY
1371
  int64_t now = taosGetTimestampMs();
×
1372
  int32_t newKeyVersion = keyVersion + 1;
×
1373

1374
  dInfo("updating DB_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
×
1375
  tstrncpy(dbKey, newKey, sizeof(dbKey));
×
1376
  dbKeyUpdateTime = now;
×
1377

1378
  // Save updated keys (use algorithm for all keys for backward compatibility)
1379
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
×
1380
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1381
  if (code != 0) {
×
1382
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1383
    return code;
×
1384
  }
1385

1386
  // Update key verification file with new DB_KEY
1387
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
×
1388
  if (code != 0) {
×
1389
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1390
    // Don't fail the operation if verification file update fails
1391
  }
1392

1393
  // Update global variables
1394
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
×
1395
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
×
1396
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
×
1397
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
×
1398
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
×
1399
  tsEncryptAlgorithmType = algorithm;
×
1400
  tsEncryptFileVersion = fileVersion;
×
1401
  tsEncryptKeyVersion = newKeyVersion;
×
1402
  tsEncryptKeyCreateTime = createTime;
×
1403
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
×
1404
  tsDbKeyUpdateTime = dbKeyUpdateTime;
×
1405

1406
  // Update encryption key status for backward compatibility
1407
  int keyLen = strlen(tsDataKey);
×
1408
  if (keyLen > ENCRYPT_KEY_LEN) {
×
1409
    keyLen = ENCRYPT_KEY_LEN;
×
1410
  }
1411
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
×
1412
  memcpy(tsEncryptKey, tsDataKey, keyLen);
×
1413
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
×
1414
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
×
1415

1416
  dInfo("successfully updated DB_KEY to version:%d", newKeyVersion);
×
1417
  return 0;
×
1418
}
1419
#endif
1420

1421
int32_t dmProcessAlterEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1422
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1423
  int32_t              code = 0;
×
1424
  SMAlterEncryptKeyReq alterKeyReq = {0};
×
1425
  if (tDeserializeSMAlterEncryptKeyReq(pMsg->pCont, pMsg->contLen, &alterKeyReq) != 0) {
×
1426
    code = TSDB_CODE_INVALID_MSG;
×
1427
    dError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
1428
    goto _exit;
×
1429
  }
1430

1431
  dInfo("received alter encrypt key req, keyType:%d", alterKeyReq.keyType);
×
1432

1433
  // Update the specified key (svr_key or db_key)
1434
  if (alterKeyReq.keyType == 0) {
×
1435
    // Update SVR_KEY
1436
    code = dmUpdateSvrKey(alterKeyReq.newKey);
×
1437
    if (code == 0) {
×
1438
      dInfo("successfully updated SVR_KEY");
×
1439
    } else {
1440
      dError("failed to update SVR_KEY, since %s", tstrerror(code));
×
1441
    }
1442
  } else if (alterKeyReq.keyType == 1) {
×
1443
    // Update DB_KEY
1444
    code = dmUpdateDbKey(alterKeyReq.newKey);
×
1445
    if (code == 0) {
×
1446
      dInfo("successfully updated DB_KEY");
×
1447
    } else {
1448
      dError("failed to update DB_KEY, since %s", tstrerror(code));
×
1449
    }
1450
  } else {
1451
    dError("invalid keyType:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", alterKeyReq.keyType);
×
1452
    code = TSDB_CODE_INVALID_PARA;
×
1453
  }
1454

1455
_exit:
×
1456
  tFreeSMAlterEncryptKeyReq(&alterKeyReq);
×
1457
  pMsg->code = code;
×
1458
  pMsg->info.rsp = NULL;
×
1459
  pMsg->info.rspLen = 0;
×
1460
  return code;
×
1461
#else
1462
  dError("encryption key management is only available in enterprise edition");
1463
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1464
  pMsg->info.rsp = NULL;
1465
  pMsg->info.rspLen = 0;
1466
  return TSDB_CODE_OPS_NOT_SUPPORT;
1467
#endif
1468
}
1469

1470
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1471
  int32_t code = 0;
×
1472
  int32_t lino = 0;
×
1473
  SMsgCb *msgCb = &pMgmt->msgCb;
×
1474
  void *pTransCli = msgCb->clientRpc;
×
1475
  void *pTransStatus = msgCb->statusRpc;  
×
1476
  void *pTransSync = msgCb->syncRpc; 
×
1477
  void *pTransServer = msgCb->serverRpc;
×
1478

1479
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
1480
  if (code != 0) {
×
1481
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
1482
    goto _error;
×
1483
  }
1484

1485
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
1486
  if (code != 0) {
×
1487
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
1488
    goto _error;
×
1489
  }
1490

1491
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
1492
  if (code != 0) {
×
1493
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
1494
    goto _error;
×
1495
  }
1496

1497
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
1498
  if (code != 0) {
×
1499
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
1500
    goto _error;
×
1501
  }
1502

1503
_error:
×
1504
  
1505
  return code;
×
1506
}
1507

1508
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
96✔
1509
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
96✔
1510
  pStatus->details[0] = 0;
96✔
1511

1512
  SMonMloadInfo minfo = {0};
96✔
1513
  (*pMgmt->getMnodeLoadsFp)(&minfo);
96✔
1514
  if (minfo.isMnode &&
96✔
1515
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
96✔
1516
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1517
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
1518
    return;
×
1519
  }
1520

1521
  SMonVloadInfo vinfo = {0};
96✔
1522
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
96✔
1523
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
288✔
1524
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
192✔
1525
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
192✔
1526
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1527
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
1528
               syncStr(pLoad->syncState));
×
1529
      break;
×
1530
    }
1531
  }
1532

1533
  taosArrayDestroy(vinfo.pVloads);
96✔
1534
}
1535

1536
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
96✔
1537
  int32_t code = 0;
96✔
1538
  dDebug("server run status req is received");
96✔
1539
  SServerStatusRsp statusRsp = {0};
96✔
1540
  dmGetServerRunStatus(pMgmt, &statusRsp);
96✔
1541

1542
  pMsg->info.rsp = NULL;
96✔
1543
  pMsg->info.rspLen = 0;
96✔
1544

1545
  SRpcMsg rspMsg = {.info = pMsg->info};
96✔
1546
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
96✔
1547
  if (rspLen < 0) {
96✔
1548
    return TSDB_CODE_OUT_OF_MEMORY;
×
1549
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1550
    // return rspMsg.code;
1551
  }
1552

1553
  void *pRsp = rpcMallocCont(rspLen);
96✔
1554
  if (pRsp == NULL) {
96✔
1555
    return terrno;
×
1556
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1557
    // return rspMsg.code;
1558
  }
1559

1560
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
96✔
1561
  if (rspLen < 0) {
96✔
1562
    return TSDB_CODE_INVALID_MSG;
×
1563
  }
1564

1565
  pMsg->info.rsp = pRsp;
96✔
1566
  pMsg->info.rspLen = rspLen;
96✔
1567
  return 0;
96✔
1568
}
1569

1570
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
22,134✔
1571
  int32_t code = 0;
22,134✔
1572

1573
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
22,134✔
1574
  if (pBlock == NULL) {
22,134✔
1575
    return terrno;
×
1576
  }
1577

1578
  size_t size = 0;
22,134✔
1579

1580
  const SSysTableMeta *pMeta = NULL;
22,134✔
1581
  getInfosDbMeta(&pMeta, &size);
22,134✔
1582

1583
  int32_t index = 0;
22,134✔
1584
  for (int32_t i = 0; i < size; ++i) {
442,680✔
1585
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
442,680✔
1586
      index = i;
22,134✔
1587
      break;
22,134✔
1588
    }
1589
  }
1590

1591
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
22,134✔
1592
  if (pBlock->pDataBlock == NULL) {
22,134✔
1593
    code = terrno;
×
1594
    goto _exit;
×
1595
  }
1596

1597
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
154,938✔
1598
    SColumnInfoData colInfoData = {0};
132,804✔
1599
    colInfoData.info.colId = i + 1;
132,804✔
1600
    colInfoData.info.type = pMeta[index].schema[i].type;
132,804✔
1601
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
132,804✔
1602
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
265,608✔
1603
      code = terrno;
×
1604
      goto _exit;
×
1605
    }
1606
  }
1607

1608
  pBlock->info.hasVarCol = true;
22,134✔
1609
_exit:
22,134✔
1610
  if (code != 0) {
22,134✔
1611
    blockDataDestroy(pBlock);
×
1612
  } else {
1613
    *ppBlock = pBlock;
22,134✔
1614
  }
1615
  return code;
22,134✔
1616
}
1617

1618
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
22,134✔
1619
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
22,134✔
1620
  if (code != 0) {
22,134✔
1621
    return code;
×
1622
  }
1623

1624
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
22,134✔
1625
  if (pColInfo == NULL) {
22,134✔
1626
    return TSDB_CODE_OUT_OF_RANGE;
×
1627
  }
1628

1629
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
22,134✔
1630
}
1631

1632
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
22,134✔
1633
  int32_t           size = 0;
22,134✔
1634
  int32_t           rowsRead = 0;
22,134✔
1635
  int32_t           code = 0;
22,134✔
1636
  SRetrieveTableReq retrieveReq = {0};
22,134✔
1637
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
22,134✔
1638
    return TSDB_CODE_INVALID_MSG;
×
1639
  }
1640
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
22,134✔
1641
#if 0
1642
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
1643
    code = TSDB_CODE_MND_NO_RIGHTS;
1644
    return code;
1645
  }
1646
#endif
1647
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
22,134✔
1648
    return TSDB_CODE_INVALID_MSG;
×
1649
  }
1650

1651
  SSDataBlock *pBlock = NULL;
22,134✔
1652
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
22,134✔
1653
    return code;
×
1654
  }
1655

1656
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
22,134✔
1657
  if (code != 0) {
22,134✔
1658
    blockDataDestroy(pBlock);
×
1659
    return code;
×
1660
  }
1661

1662
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
22,134✔
1663
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
22,134✔
1664
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
22,134✔
1665

1666
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
22,134✔
1667
  if (pRsp == NULL) {
22,134✔
1668
    code = terrno;
×
1669
    dError("failed to retrieve data since %s", tstrerror(code));
×
1670
    blockDataDestroy(pBlock);
×
1671
    return code;
×
1672
  }
1673

1674
  char *pStart = pRsp->data;
22,134✔
1675
  *(int32_t *)pStart = htonl(numOfCols);
22,134✔
1676
  pStart += sizeof(int32_t);  // number of columns
22,134✔
1677

1678
  for (int32_t i = 0; i < numOfCols; ++i) {
154,938✔
1679
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
132,804✔
1680
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
132,804✔
1681

1682
    pSchema->bytes = htonl(pColInfo->info.bytes);
132,804✔
1683
    pSchema->colId = htons(pColInfo->info.colId);
132,804✔
1684
    pSchema->type = pColInfo->info.type;
132,804✔
1685

1686
    pStart += sizeof(SSysTableSchema);
132,804✔
1687
  }
1688

1689
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
22,134✔
1690
  if (len < 0) {
22,134✔
1691
    dError("failed to retrieve data since %s", tstrerror(code));
×
1692
    blockDataDestroy(pBlock);
×
1693
    rpcFreeCont(pRsp);
×
1694
    return terrno;
×
1695
  }
1696

1697
  pRsp->numOfRows = htonl(pBlock->info.rows);
22,134✔
1698
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
22,134✔
1699
  pRsp->completed = 1;
22,134✔
1700
  pMsg->info.rsp = pRsp;
22,134✔
1701
  pMsg->info.rspLen = size;
22,134✔
1702
  dDebug("dnode variables retrieve completed");
22,134✔
1703

1704
  blockDataDestroy(pBlock);
22,134✔
1705
  return TSDB_CODE_SUCCESS;
22,134✔
1706
}
1707

1708
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
13,443,861✔
1709
  SMStreamHbRspMsg rsp = {0};
13,443,861✔
1710
  int32_t          code = 0;
13,443,861✔
1711
  SDecoder         decoder;
13,441,031✔
1712
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
13,443,861✔
1713
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
13,443,861✔
1714
  int64_t          currTs = taosGetTimestampMs();
13,443,861✔
1715

1716
  if (pMsg->code) {
13,443,861✔
1717
    return streamHbHandleRspErr(pMsg->code, currTs);
191,674✔
1718
  }
1719

1720
  tDecoderInit(&decoder, (uint8_t*)msg, len);
13,252,187✔
1721
  code = tDecodeStreamHbRsp(&decoder, &rsp);
13,252,187✔
1722
  if (code < 0) {
13,252,187✔
1723
    code = TSDB_CODE_INVALID_MSG;
×
1724
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
1725
    tDecoderClear(&decoder);
×
1726
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
1727
    return streamHbHandleRspErr(code, currTs);
×
1728
  }
1729

1730
  tDecoderClear(&decoder);
13,252,187✔
1731

1732
  return streamHbProcessRspMsg(&rsp);
13,252,187✔
1733
}
1734

1735

1736
SArray *dmGetMsgHandles() {
533,347✔
1737
  int32_t code = -1;
533,347✔
1738
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
533,347✔
1739
  if (pArray == NULL) {
533,347✔
1740
    return NULL;
×
1741
  }
1742

1743
  // Requests handled by DNODE
1744
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1745
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1746
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1747
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1748
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1749
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1750
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1751
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1752
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1753
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1754
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1755
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1756
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1757
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1758
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1759
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1760
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1761

1762
  // Requests handled by MNODE
1763
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1764
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1765
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
533,347✔
1766

1767
  code = 0;
533,347✔
1768

1769
_OVER:
533,347✔
1770
  if (code != 0) {
533,347✔
1771
    taosArrayDestroy(pArray);
×
1772
    return NULL;
×
1773
  } else {
1774
    return pArray;
533,347✔
1775
  }
1776
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc