• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.98
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "crypt.h"
21
#include "monitor.h"
22
#include "stream.h"
23
#include "systable.h"
24
#include "tanalytics.h"
25
#include "tchecksum.h"
26
#include "tencrypt.h"
27
#include "tutil.h"
28

29
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
30
#include "taoskInt.h"
31
#endif
32

33
extern SConfig *tsCfg;
34
extern void     setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId);
35

36
#ifndef TD_ENTERPRISE
37
void setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId) {}
38
#endif
39

40
extern void getAuditDbNameToken(char *pDb, char *pToken);
41

42
#ifndef TD_ENTERPRISE
43
void getAuditDbNameToken(char *pDb, char *pToken) {}
44
#endif
45

46
extern void getAuditEpSet(SEpSet *ep, int32_t *pVgId);
47

48
#ifndef TD_ENTERPRISE
49
void getAuditEpSet(SEpSet *ep, int32_t *pVgId) {}
50
#endif
51

52
SMonVloadInfo tsVinfo = {0};
53
SMnodeLoad    tsMLoad = {0};
54
SDnodeData    tsDnodeData = {0};
55

56
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
1,959,034✔
57
  int32_t code = 0;
1,959,034✔
58
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
1,959,034✔
59
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
489,166✔
60
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
489,166✔
61
    pMgmt->pData->dnodeId = pCfg->dnodeId;
489,166✔
62
    pMgmt->pData->clusterId = pCfg->clusterId;
489,166✔
63
    monSetDnodeId(pCfg->dnodeId);
489,166✔
64
    auditSetDnodeId(pCfg->dnodeId);
489,166✔
65
    code = dmWriteEps(pMgmt->pData);
489,166✔
66
    if (code != 0) {
489,166✔
67
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
1,487✔
68
            tstrerror(code));
69
    }
70
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
489,166✔
71
  }
72
}
1,959,034✔
73

74
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,115,828✔
75
  int32_t code = 0;
2,115,828✔
76
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,115,828✔
77
  if (pMgmt->pData->ipWhiteVer == ver) {
2,115,828✔
78
    if (ver == 0) {
2,115,390✔
79
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,114,988✔
80
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,114,988✔
81
        dError("failed to disable ip white list on dnode");
×
82
      }
83
    }
84
    return;
2,115,390✔
85
  }
86
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
438✔
87

88
  SRetrieveWhiteListReq req = {.ver = oldVer};
438✔
89
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
438✔
90
  if (contLen < 0) {
438✔
91
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
92
    return;
×
93
  }
94
  void *pHead = rpcMallocCont(contLen);
438✔
95
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
438✔
96
  if (contLen < 0) {
438✔
97
    rpcFreeCont(pHead);
×
98
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
99
    return;
×
100
  }
101

102
  SRpcMsg rpcMsg = {.pCont = pHead,
438✔
103
                    .contLen = contLen,
104
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
105
                    .info.ahandle = 0,
106
                    .info.notFreeAhandle = 1,
107
                    .info.refId = 0,
108
                    .info.noResp = 0,
109
                    .info.handle = 0};
110
  SEpSet  epset = {0};
438✔
111

112
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
438✔
113

114
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
438✔
115
  if (code != 0) {
438✔
116
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
117
  }
118
}
119

120

121

122
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,115,828✔
123
  int32_t code = 0;
2,115,828✔
124
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,115,828✔
125
  if (pMgmt->pData->timeWhiteVer == ver) {
2,115,828✔
126
    if (ver == 0) {
2,115,390✔
127
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,114,988✔
128
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,114,988✔
129
        dError("failed to disable time white list on dnode");
×
130
      }
131
    }
132
    return;
2,115,390✔
133
  }
134
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
438✔
135

136
  SRetrieveWhiteListReq req = {.ver = oldVer};
438✔
137
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
438✔
138
  if (contLen < 0) {
438✔
139
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
140
    return;
×
141
  }
142
  void *pHead = rpcMallocCont(contLen);
438✔
143
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
438✔
144
  if (contLen < 0) {
438✔
145
    rpcFreeCont(pHead);
×
146
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
147
    return;
×
148
  }
149

150
  SRpcMsg rpcMsg = {.pCont = pHead,
438✔
151
                    .contLen = contLen,
152
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
153
                    .info.ahandle = 0,
154
                    .info.notFreeAhandle = 1,
155
                    .info.refId = 0,
156
                    .info.noResp = 0,
157
                    .info.handle = 0};
158
  SEpSet  epset = {0};
438✔
159

160
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
438✔
161

162
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
438✔
163
  if (code != 0) {
438✔
164
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
165
  }
166
}
167

168

169

170
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,115,828✔
171
  int32_t code = 0;
2,115,828✔
172
  int64_t oldVer = taosAnalyGetVersion();
2,115,828✔
173
  if (oldVer == newVer) return;
2,115,828✔
174
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
175

176
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
177
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
178
  if (contLen < 0) {
×
179
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
180
    return;
×
181
  }
182

183
  void *pHead = rpcMallocCont(contLen);
×
184
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
185
  if (contLen < 0) {
×
186
    rpcFreeCont(pHead);
×
187
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
188
    return;
×
189
  }
190

191
  SRpcMsg rpcMsg = {
×
192
      .pCont = pHead,
193
      .contLen = contLen,
194
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
195
      .info.ahandle = 0,
196
      .info.refId = 0,
197
      .info.noResp = 0,
198
      .info.handle = 0,
199
  };
200
  SEpSet epset = {0};
×
201

202
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
203

204
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
205
  if (code != 0) {
×
206
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
207
  }
208
}
209

210
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
49,573,554✔
211
  const STraceId *trace = &pRsp->info.traceId;
49,573,554✔
212
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
49,573,554✔
213

214
  if (pRsp->code != 0) {
49,573,554✔
215
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
459,291✔
216
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
7,534✔
217
             pMgmt->statusSeq);
218
      pMgmt->pData->dropped = 1;
7,534✔
219
      if (dmWriteEps(pMgmt->pData) != 0) {
7,534✔
220
        dError("failed to write dnode file");
×
221
      }
222
      dInfo("dnode will exit since it is in the dropped state");
7,534✔
223
      (void)raise(SIGINT);
7,534✔
224
    }
225
  } else {
226
    SStatusRsp statusRsp = {0};
49,114,263✔
227
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
51,230,091✔
228
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,115,828✔
229
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,115,828✔
230
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
1,959,034✔
231
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
232
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
1,959,034✔
233
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
1,959,034✔
234
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
1,959,034✔
235
      }
236
      setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken, &(statusRsp.auditEpSet), statusRsp.auditVgId);
2,115,828✔
237
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,115,828✔
238
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,115,828✔
239
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,115,828✔
240
    }
241
    tFreeSStatusRsp(&statusRsp);
49,114,263✔
242
  }
243
  rpcFreeCont(pRsp->pCont);
49,573,554✔
244
}
49,573,554✔
245

246
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
49,669,147✔
247
  int32_t    code = 0;
49,669,147✔
248
  SStatusReq req = {0};
49,669,147✔
249
  req.timestamp = taosGetTimestampMs();
49,669,147✔
250
  pMgmt->statusSeq++;
49,669,147✔
251

252
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
49,669,147✔
253
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
49,669,147✔
254
    dError("failed to lock status info lock");
×
255
    return;
×
256
  }
257

258
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
49,669,147✔
259
  req.sver = tsVersion;
49,669,147✔
260
  req.dnodeVer = tsDnodeData.dnodeVer;
49,669,147✔
261
  req.dnodeId = tsDnodeData.dnodeId;
49,669,147✔
262
  req.clusterId = tsDnodeData.clusterId;
49,669,147✔
263
  if (req.clusterId == 0) req.dnodeId = 0;
49,669,147✔
264
  req.rebootTime = tsDnodeData.rebootTime;
49,669,147✔
265
  req.updateTime = tsDnodeData.updateTime;
49,669,147✔
266
  req.numOfCores = tsNumOfCores;
49,669,147✔
267
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
49,669,147✔
268
  req.numOfDiskCfg = tsDiskCfgNum;
49,669,147✔
269
  req.memTotal = tsTotalMemoryKB * 1024;
49,669,147✔
270
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
49,669,147✔
271
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
49,669,147✔
272
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
49,669,147✔
273

274
  req.clusterCfg.statusInterval = tsStatusInterval;
49,669,147✔
275
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
49,669,147✔
276
  req.clusterCfg.checkTime = 0;
49,669,147✔
277
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
49,669,147✔
278
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
49,669,147✔
279
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
49,669,147✔
280
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
49,669,147✔
281
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
49,669,147✔
282
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
49,669,147✔
283
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
49,669,147✔
284
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
49,669,147✔
285
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
49,669,147✔
286
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
49,669,147✔
287
  char timestr[32] = "1970-01-01 00:00:00.00";
49,669,147✔
288
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
49,669,147✔
289
      0) {
290
    dError("failed to parse time since %s", tstrerror(code));
×
291
  }
292
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
49,669,147✔
293
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
49,669,147✔
294
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
49,669,147✔
295

296
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
49,669,147✔
297

298
  req.pVloads = tsVinfo.pVloads;
49,669,147✔
299
  tsVinfo.pVloads = NULL;
49,669,147✔
300

301
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
49,669,147✔
302
  req.mload = tsMLoad;
49,669,147✔
303

304
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
49,669,147✔
305
    dError("failed to unlock status info lock");
×
306
    return;
×
307
  }
308

309
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
49,669,147✔
310
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
49,669,147✔
311

312
  req.statusSeq = pMgmt->statusSeq;
49,669,147✔
313
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
49,669,147✔
314
  req.analVer = taosAnalyGetVersion();
49,669,147✔
315
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
49,669,147✔
316

317
  if (tsAuditUseToken) {
49,669,147✔
318
    getAuditDbNameToken(req.auditDB, req.auditToken);
49,667,813✔
319
  }
320

321
  if (tsAuditSaveInSelf) {
49,669,147✔
322
    getAuditEpSet(&req.auditEpSet, &req.auditVgId);
1,334✔
323
  }
324

325
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
49,669,147✔
326
  if (contLen < 0) {
49,669,147✔
327
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
328
    return;
×
329
  }
330

331
  void *pHead = rpcMallocCont(contLen);
49,669,147✔
332
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
49,669,147✔
333
  if (contLen < 0) {
49,669,147✔
334
    rpcFreeCont(pHead);
×
335
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
336
    return;
×
337
  }
338
  tFreeSStatusReq(&req);
49,669,147✔
339

340
  SRpcMsg rpcMsg = {.pCont = pHead,
49,669,147✔
341
                    .contLen = contLen,
342
                    .msgType = TDMT_MND_STATUS,
343
                    .info.ahandle = 0,
344
                    .info.notFreeAhandle = 1,
345
                    .info.refId = 0,
346
                    .info.noResp = 0,
347
                    .info.handle = 0};
348
  SRpcMsg rpcRsp = {0};
49,669,147✔
349

350
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
49,669,147✔
351

352
  SEpSet epSet = {0};
49,669,147✔
353
  int8_t epUpdated = 0;
49,669,147✔
354
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
49,669,147✔
355

356
  if (dDebugFlag & DEBUG_TRACE) {
49,669,147✔
357
    char tbuf[512];
1,813,408✔
358
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
1,816,426✔
359
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
1,816,426✔
360
  }
361
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
49,669,147✔
362
  if (code != 0) {
49,669,147✔
363
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
95,593✔
364
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
95,593✔
365
      dmRotateMnodeEpSet(pMgmt->pData);
95,593✔
366
      char tbuf[512];
95,593✔
367
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
95,593✔
368
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
95,593✔
369
            tbuf, epSet.inUse);
370
    }
371
    return;
95,593✔
372
  }
373

374
  if (rpcRsp.code != 0) {
49,573,554✔
375
    dmRotateMnodeEpSet(pMgmt->pData);
459,291✔
376
    char tbuf[512];
459,291✔
377
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
459,291✔
378
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
459,291✔
379
          tbuf, epSet.inUse);
380
  } else {
381
    if (epUpdated == 1) {
49,114,263✔
382
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
90,565✔
383
    }
384
  }
385
  dmProcessStatusRsp(pMgmt, &rpcRsp);
49,573,554✔
386
}
387

388
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
629,965✔
389
  const STraceId *trace = &pRsp->info.traceId;
629,965✔
390
  int32_t         code = 0;
629,965✔
391
  SConfigRsp      configRsp = {0};
629,965✔
392
  bool            needStop = false;
629,965✔
393

394
  if (pRsp->code != 0) {
629,965✔
395
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
396
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
397
      pMgmt->pData->dropped = 1;
×
398
      if (dmWriteEps(pMgmt->pData) != 0) {
×
399
        dError("failed to write dnode file");
×
400
      }
401
      dInfo("dnode will exit since it is in the dropped state");
×
402
      (void)raise(SIGINT);
×
403
    }
404
  } else {
405
    bool needUpdate = false;
629,965✔
406
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,259,930✔
407
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
629,965✔
408
      // Try to use cfg from mnode sdb.
409
      if (!configRsp.isVersionVerified) {
629,965✔
410
        uInfo("config version not verified, update config");
491,124✔
411
        needUpdate = true;
491,124✔
412
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
491,124✔
413
        if (code != TSDB_CODE_SUCCESS) {
491,124✔
414
          dError("failed to persist global config since %s", tstrerror(code));
1,359✔
415
          goto _exit;
1,359✔
416
        }
417
      }
418
    }
419
    if (needUpdate) {
628,606✔
420
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
489,765✔
421
      if (code != TSDB_CODE_SUCCESS) {
489,765✔
422
        dError("failed to update config since %s", tstrerror(code));
×
423
        goto _exit;
×
424
      }
425
      code = setAllConfigs(tsCfg);
489,765✔
426
      if (code != TSDB_CODE_SUCCESS) {
489,765✔
427
        dError("failed to set all configs since %s", tstrerror(code));
1,560✔
428
        goto _exit;
1,560✔
429
      }
430
    }
431
    code = taosPersistLocalConfig(pMgmt->path);
627,046✔
432
    if (code != TSDB_CODE_SUCCESS) {
627,046✔
433
      dError("failed to persist local config since %s", tstrerror(code));
×
434
    }
435
    tsConfigInited = 1;
627,046✔
436
  }
437
_exit:
629,965✔
438
  tFreeSConfigRsp(&configRsp);
629,965✔
439
  rpcFreeCont(pRsp->pCont);
629,965✔
440
  if (needStop) {
629,965✔
441
    dmStop();
×
442
  }
443
}
629,965✔
444

445
int32_t dmProcessKeySyncRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
603,852✔
446
  const STraceId *trace = &pRsp->info.traceId;
603,852✔
447
  int32_t         code = 0;
603,852✔
448
  SKeySyncRsp     keySyncRsp = {0};
603,852✔
449

450
  if (pRsp->code != 0) {
603,852✔
451
    dError("failed to sync keys from mnode since %s", tstrerror(pRsp->code));
×
452
    code = pRsp->code;
×
453
    goto _exit;
×
454
  }
455

456
  if (pRsp->pCont == NULL || pRsp->contLen <= 0) {
603,852✔
457
    dError("invalid key sync response, empty content");
×
458
    code = TSDB_CODE_INVALID_MSG;
×
459
    goto _exit;
×
460
  }
461

462
  code = tDeserializeSKeySyncRsp(pRsp->pCont, pRsp->contLen, &keySyncRsp);
603,852✔
463
  if (code != 0) {
603,852✔
464
    dError("failed to deserialize key sync response since %s", tstrerror(code));
×
465
    goto _exit;
×
466
  }
467

468
  dInfo("received key sync response, mnode keyVersion:%d, local keyVersion:%d, needUpdate:%d", keySyncRsp.keyVersion,
603,852✔
469
        tsLocalKeyVersion, keySyncRsp.needUpdate);
470
  tsEncryptKeysStatus = keySyncRsp.encryptionKeyStatus;
603,852✔
471
  if (keySyncRsp.needUpdate) {
603,852✔
472
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
473
    // Get encrypt file path from tsDataDir
474
    char masterKeyFile[PATH_MAX] = {0};
1,592✔
475
    char derivedKeyFile[PATH_MAX] = {0};
1,592✔
476
    snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,592✔
477
             TD_DIRSEP);
478
    snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
1,592✔
479
             TD_DIRSEP);
480

481
    dInfo("updating local encryption keys from mnode, key file is saved in %s and %s, keyVersion:%d -> %d",
1,592✔
482
          masterKeyFile, derivedKeyFile, tsLocalKeyVersion, keySyncRsp.keyVersion);
483

484
    // Save keys to master.bin and derived.bin
485
    // Use the same algorithm for cfg and meta keys (backward compatible)
486
    code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, keySyncRsp.svrKey, keySyncRsp.dbKey, keySyncRsp.cfgKey, keySyncRsp.metaKey,
1,592✔
487
                                keySyncRsp.dataKey, keySyncRsp.algorithm, keySyncRsp.algorithm, keySyncRsp.algorithm,
488
                                keySyncRsp.keyVersion, keySyncRsp.createTime,
489
                                keySyncRsp.svrKeyUpdateTime, keySyncRsp.dbKeyUpdateTime);
490
    if (code != 0) {
1,592✔
491
      dError("failed to save encryption keys since %s", tstrerror(code));
×
492
      goto _exit;
×
493
    }
494

495
    // Update global variables with synced keys
496
    tstrncpy(tsSvrKey, keySyncRsp.svrKey, sizeof(tsSvrKey));
1,592✔
497
    tstrncpy(tsDbKey, keySyncRsp.dbKey, sizeof(tsDbKey));
1,592✔
498
    tstrncpy(tsCfgKey, keySyncRsp.cfgKey, sizeof(tsCfgKey));
1,592✔
499
    tstrncpy(tsMetaKey, keySyncRsp.metaKey, sizeof(tsMetaKey));
1,592✔
500
    tstrncpy(tsDataKey, keySyncRsp.dataKey, sizeof(tsDataKey));
1,592✔
501
    tsEncryptAlgorithmType = keySyncRsp.algorithm;
1,592✔
502
    tsEncryptKeyVersion = keySyncRsp.keyVersion;
1,592✔
503
    tsEncryptKeyCreateTime = keySyncRsp.createTime;
1,592✔
504
    tsSvrKeyUpdateTime = keySyncRsp.svrKeyUpdateTime;
1,592✔
505
    tsDbKeyUpdateTime = keySyncRsp.dbKeyUpdateTime;
1,592✔
506

507
    // Update local key version
508
    tsLocalKeyVersion = keySyncRsp.keyVersion;
1,592✔
509
    dInfo("successfully updated local encryption keys to version:%d", tsLocalKeyVersion);
1,592✔
510

511
    // Encrypt existing plaintext config files
512
    code = taosEncryptExistingCfgFiles(tsDataDir);
1,592✔
513
    if (code != 0) {
1,592✔
514
      dWarn("failed to encrypt existing config files since %s, will retry on next write", tstrerror(code));
×
515
      // Don't fail the key sync, files will be encrypted on next write
516
      code = 0;
×
517
    }
518
#else
519
    dWarn("enterprise features not enabled, skipping key sync");
520
#endif
521
  } else {
522
    dDebug("local keys are up to date, version:%d", tsLocalKeyVersion);
602,260✔
523
  }
524
  
525
  code = TSDB_CODE_SUCCESS;
603,852✔
526

527
_exit:
603,852✔
528
  rpcFreeCont(pRsp->pCont);
603,852✔
529
  return code;
603,852✔
530
}
531

532
void dmSendKeySyncReq(SDnodeMgmt *pMgmt) {
614,469✔
533
  int32_t     code = 0;
614,469✔
534
  SKeySyncReq req = {0};
614,469✔
535

536
  req.dnodeId = pMgmt->pData->dnodeId;
614,469✔
537
  req.keyVersion = tsLocalKeyVersion;
614,469✔
538
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d", req.dnodeId, req.keyVersion);
614,469✔
539

540
  int32_t contLen = tSerializeSKeySyncReq(NULL, 0, &req);
614,469✔
541
  if (contLen < 0) {
614,469✔
542
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
UNCOV
543
    return;
×
544
  }
545

546
  void *pHead = rpcMallocCont(contLen);
614,469✔
547
  if (pHead == NULL) {
614,469✔
548
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
549
    return;
×
550
  }
551
  contLen = tSerializeSKeySyncReq(pHead, contLen, &req);
614,469✔
552
  if (contLen < 0) {
614,469✔
553
    rpcFreeCont(pHead);
×
554
    dError("failed to serialize key sync req since %s", tstrerror(contLen));
×
555
    return;
×
556
  }
557

558
  SRpcMsg rpcMsg = {.pCont = pHead,
614,469✔
559
                    .contLen = contLen,
560
                    .msgType = TDMT_MND_KEY_SYNC,
561
                    .info.ahandle = 0,
562
                    .info.notFreeAhandle = 1,
563
                    .info.refId = 0,
564
                    .info.noResp = 0,
565
                    .info.handle = 0};
566
  SRpcMsg rpcRsp = {0};
614,469✔
567

568
  SEpSet epSet = {0};
614,469✔
569
  int8_t epUpdated = 0;
614,469✔
570
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
614,469✔
571

572
  dDebug("send key sync req to mnode, dnodeId:%d keyVersion:%d, begin to send rpc msg", req.dnodeId, req.keyVersion);
614,469✔
573
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
614,469✔
574
  if (code != 0) {
614,469✔
575
    dError("failed to SendRecv key sync req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
10,617✔
576
    return;
10,617✔
577
  }
578
  if (rpcRsp.code != 0) {
603,852✔
579
    dError("failed to send key sync req since %s", tstrerror(rpcRsp.code));
×
580
    return;
×
581
  }
582
  code = dmProcessKeySyncRsp(pMgmt, &rpcRsp);
603,852✔
583
  if (code != 0) {
603,852✔
584
    dError("failed to process key sync rsp since %s", tstrerror(code));
×
585
    return;
×
586
  }
587
}
588

589
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
636,292✔
590
  int32_t    code = 0;
636,292✔
591
  SConfigReq req = {0};
636,292✔
592

593
  req.cver = tsdmConfigVersion;
636,292✔
594
  req.forceReadConfig = true;
636,292✔
595
  req.array = taosGetGlobalCfg(tsCfg);
636,292✔
596
  dDebug("send config req to mnode, configVersion:%d", req.cver);
636,292✔
597

598
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
636,292✔
599
  if (contLen < 0) {
636,292✔
600
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
601
    return;
×
602
  }
603

604
  void *pHead = rpcMallocCont(contLen);
636,292✔
605
  if (pHead == NULL) {
636,292✔
606
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
607
    return;
×
608
  }
609
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
636,292✔
610
  if (contLen < 0) {
636,292✔
611
    rpcFreeCont(pHead);
×
612
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
613
    return;
×
614
  }
615

616
  SRpcMsg rpcMsg = {.pCont = pHead,
636,292✔
617
                    .contLen = contLen,
618
                    .msgType = TDMT_MND_CONFIG,
619
                    .info.ahandle = 0,
620
                    .info.notFreeAhandle = 1,
621
                    .info.refId = 0,
622
                    .info.noResp = 0,
623
                    .info.handle = 0};
624
  SRpcMsg rpcRsp = {0};
636,292✔
625

626
  SEpSet epSet = {0};
636,292✔
627
  int8_t epUpdated = 0;
636,292✔
628
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
636,292✔
629

630
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
636,292✔
631
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
636,292✔
632
  if (code != 0) {
636,292✔
633
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
6,327✔
634
    return;
6,327✔
635
  }
636
  if (rpcRsp.code != 0) {
629,965✔
637
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
638
    return;
×
639
  }
640
  dmProcessConfigRsp(pMgmt, &rpcRsp);
629,965✔
641
}
642

643
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
50,225,541✔
644
  dDebug("begin to get dnode info");
50,225,541✔
645
  SDnodeData dnodeData = {0};
50,225,541✔
646
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
50,225,541✔
647
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
50,225,541✔
648
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
50,225,541✔
649
  dnodeData.clusterId = pMgmt->pData->clusterId;
50,225,541✔
650
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
50,225,541✔
651
  dnodeData.updateTime = pMgmt->pData->updateTime;
50,225,541✔
652
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
50,225,541✔
653
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
50,225,541✔
654

655
  dDebug("begin to get vnode loads");
50,225,541✔
656
  SMonVloadInfo vinfo = {0};
50,225,541✔
657
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
50,225,541✔
658

659
  dDebug("begin to get mnode loads");
50,225,541✔
660
  SMonMloadInfo minfo = {0};
50,225,541✔
661
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
50,225,541✔
662

663
  dDebug("begin to lock status info");
50,225,541✔
664
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
50,225,541✔
665
    dError("failed to lock status info lock");
×
666
    return;
×
667
  }
668
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
50,225,541✔
669
  tsDnodeData.dnodeId = dnodeData.dnodeId;
50,225,541✔
670
  tsDnodeData.clusterId = dnodeData.clusterId;
50,225,541✔
671
  tsDnodeData.rebootTime = dnodeData.rebootTime;
50,225,541✔
672
  tsDnodeData.updateTime = dnodeData.updateTime;
50,225,541✔
673
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
50,225,541✔
674

675
  if (tsVinfo.pVloads == NULL) {
50,225,541✔
676
    tsVinfo.pVloads = vinfo.pVloads;
48,877,690✔
677
    vinfo.pVloads = NULL;
48,877,690✔
678
  } else {
679
    taosArrayDestroy(vinfo.pVloads);
1,347,851✔
680
    vinfo.pVloads = NULL;
1,347,851✔
681
  }
682

683
  tsMLoad = minfo.load;
50,225,541✔
684

685
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
50,225,541✔
686
    dError("failed to unlock status info lock");
×
687
    return;
×
688
  }
689
}
690

691
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
692
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
693
  if (contLen < 0) {
×
694
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
695
    return;
×
696
  }
697
  void *pHead = rpcMallocCont(contLen);
×
698
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
699
  if (contLen < 0) {
×
700
    rpcFreeCont(pHead);
×
701
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
702
    return;
×
703
  }
704

705
  SRpcMsg rpcMsg = {.pCont = pHead,
×
706
                    .contLen = contLen,
707
                    .msgType = TDMT_MND_NOTIFY,
708
                    .info.ahandle = 0,
709
                    .info.notFreeAhandle = 1,
710
                    .info.refId = 0,
711
                    .info.noResp = 1,
712
                    .info.handle = 0};
713

714
  SEpSet epSet = {0};
×
715
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
716
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
717
    dError("failed to send notify req");
×
718
  }
719
}
720

721
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
722
  dError("auth rsp is received, but not supported yet");
×
723
  return 0;
×
724
}
725

726
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
727
  dError("grant rsp is received, but not supported yet");
×
728
  return 0;
×
729
}
730

731
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
91,947✔
732
  int32_t       code = 0;
91,947✔
733
  SDCfgDnodeReq cfgReq = {0};
91,947✔
734
  SConfig      *pCfg = taosGetCfg();
91,947✔
735
  SConfigItem  *pItem = NULL;
91,947✔
736

737
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
91,947✔
738
    return TSDB_CODE_INVALID_MSG;
×
739
  }
740
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
91,947✔
741
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
3,473✔
742
  }
743

744
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
88,474✔
745

746
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
88,474✔
747
  if (code != 0) {
88,474✔
748
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
204✔
749
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
204✔
750
      return TSDB_CODE_SUCCESS;
204✔
751
    } else {
752
      return code;
×
753
    }
754
  }
755
  if (pItem == NULL) {
88,270✔
756
    return TSDB_CODE_CFG_NOT_FOUND;
×
757
  }
758

759
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
88,270✔
760
    char value[10] = {0};
×
761
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
762
      tsSyncTimeout = 0;
×
763
    }
764

765
    if (tsSyncTimeout > 0) {
×
766
      SConfigItem *pItemTmp = NULL;
×
767
      char         tmp[10] = {0};
×
768

769
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout);
×
770
      TAOS_CHECK_RETURN(
×
771
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
772
      if (pItemTmp == NULL) {
×
773
        return TSDB_CODE_CFG_NOT_FOUND;
×
774
      }
775

776
      snprintf(tmp, sizeof(tmp), "%d", tsSyncTimeout / 4);
×
777
      TAOS_CHECK_RETURN(
×
778
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
779
      if (pItemTmp == NULL) {
×
780
        return TSDB_CODE_CFG_NOT_FOUND;
×
781
      }
782
      TAOS_CHECK_RETURN(
×
783
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
784
      if (pItemTmp == NULL) {
×
785
        return TSDB_CODE_CFG_NOT_FOUND;
×
786
      }
787

788
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
789
      TAOS_CHECK_RETURN(
×
790
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
791
      if (pItemTmp == NULL) {
×
792
        return TSDB_CODE_CFG_NOT_FOUND;
×
793
      }
794
      TAOS_CHECK_RETURN(
×
795
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
796
      if (pItemTmp == NULL) {
×
797
        return TSDB_CODE_CFG_NOT_FOUND;
×
798
      }
799
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
800
      if (pItemTmp == NULL) {
×
801
        return TSDB_CODE_CFG_NOT_FOUND;
×
802
      }
803

804
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
805
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
806
      if (pItemTmp == NULL) {
×
807
        return TSDB_CODE_CFG_NOT_FOUND;
×
808
      }
809

810
      snprintf(tmp, sizeof(tmp), "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
811
      TAOS_CHECK_RETURN(
×
812
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
813
      if (pItemTmp == NULL) {
×
814
        return TSDB_CODE_CFG_NOT_FOUND;
×
815
      }
816
      TAOS_CHECK_RETURN(
×
817
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
818
      if (pItemTmp == NULL) {
×
819
        return TSDB_CODE_CFG_NOT_FOUND;
×
820
      }
821
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
822
      if (pItemTmp == NULL) {
×
823
        return TSDB_CODE_CFG_NOT_FOUND;
×
824
      }
825

826
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
827
            tsSyncTimeout);
828
    }
829
  }
830

831
  if (!isConifgItemLazyMode(pItem)) {
88,270✔
832
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
87,631✔
833

834
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
87,631✔
835
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
836
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
837
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
838

839
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
840
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
841
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
842
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
843

844
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
845
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
846
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
847

848
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
849
            tsSyncTimeout);
850
    }
851
  }
852

853
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
88,270✔
854
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
15,333✔
855
    if (code != TSDB_CODE_SUCCESS) {
15,333✔
856
      dError("failed to persist global config since %s", tstrerror(code));
×
857
    }
858
  } else {
859
    code = taosPersistLocalConfig(pMgmt->path);
72,937✔
860
    if (code != TSDB_CODE_SUCCESS) {
72,937✔
861
      dError("failed to persist local config since %s", tstrerror(code));
×
862
    }
863
  }
864

865
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
88,270✔
866
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
867

868
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
869
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
870
  }
871

872
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
88,270✔
873
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
88,270✔
874
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
875
  }
876

877
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
88,270✔
878
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
88,270✔
879
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
880
  }
881

882
  if (cfgReq.version > 0) {
88,270✔
883
    tsdmConfigVersion = cfgReq.version;
22,209✔
884
  }
885
  return code;
88,270✔
886
}
887

888
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
176✔
889
#ifdef TD_ENTERPRISE
890
  int32_t       code = 0;
176✔
891
  SDCfgDnodeReq cfgReq = {0};
176✔
892
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
176✔
893
    code = TSDB_CODE_INVALID_MSG;
×
894
    goto _exit;
×
895
  }
896

897
  code = dmUpdateEncryptKey(cfgReq.value, true);
176✔
898
  if (code == 0) {
176✔
899
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
176✔
900
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
176✔
901
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
176✔
902
  }
903

904
_exit:
176✔
905
  pMsg->code = code;
176✔
906
  pMsg->info.rsp = NULL;
176✔
907
  pMsg->info.rspLen = 0;
176✔
908
  return code;
176✔
909
#else
910
  return 0;
911
#endif
912
}
913

914
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
915
// Verification plaintext used to validate encryption keys
916
#define KEY_VERIFY_PLAINTEXT "TDengine_Encryption_Key_Verification_v1.0"
917

918
// Save key verification file with encrypted plaintext for each key
919
static int32_t dmSaveKeyVerification(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
1,851✔
920
                                     const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
921
                                     int32_t metaAlgorithm) {
922
  char    verifyFile[PATH_MAX] = {0};
1,851✔
923
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
1,851✔
924
                            TD_DIRSEP, TD_DIRSEP);
925
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
1,851✔
926
    dError("failed to build key verification file path");
×
927
    return TSDB_CODE_OUT_OF_BUFFER;
×
928
  }
929

930
  int32_t     code = 0;
1,851✔
931
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
1,851✔
932
  int32_t     plaintextLen = strlen(plaintext);
1,851✔
933

934
  // Array of keys and their algorithms
935
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
1,851✔
936
  int32_t     algorithms[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
1,851✔
937
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
1,851✔
938

939
  // Calculate total buffer size
940
  int32_t encryptedLen = ((plaintextLen + 15) / 16) * 16;                 // Padded length for CBC
1,851✔
941
  int32_t headerSize = sizeof(uint32_t) + sizeof(uint16_t);               // magic + version
1,851✔
942
  int32_t perKeySize = sizeof(int32_t) + sizeof(int32_t) + encryptedLen;  // algo + len + encrypted data
1,851✔
943
  int32_t totalSize = headerSize + perKeySize * 5;
1,851✔
944

945
  // Allocate buffer for all data
946
  char *buffer = taosMemoryMalloc(totalSize);
1,851✔
947
  if (buffer == NULL) {
1,851✔
948
    dError("failed to allocate memory for key verification buffer");
×
949
    return terrno;
×
950
  }
951

952
  char *ptr = buffer;
1,851✔
953

954
  // Write magic number and version to buffer
955
  uint32_t magic = 0x544B5659;  // "TKVY" in hex
1,851✔
956
  uint16_t version = 1;
1,851✔
957
  memcpy(ptr, &magic, sizeof(magic));
1,851✔
958
  ptr += sizeof(magic);
1,851✔
959
  memcpy(ptr, &version, sizeof(version));
1,851✔
960
  ptr += sizeof(version);
1,851✔
961

962
  // Encrypt all keys and write to buffer
963
  char paddedPlaintext[512] = {0};
1,851✔
964
  memcpy(paddedPlaintext, plaintext, plaintextLen);
1,851✔
965

966
  for (int i = 0; i < 5; i++) {
11,106✔
967
    char encrypted[512] = {0};
9,255✔
968

969
    // Encrypt the verification plaintext with this key using CBC
970
    SCryptOpts opts = {0};
9,255✔
971
    opts.len = encryptedLen;
9,255✔
972
    opts.source = paddedPlaintext;
9,255✔
973
    opts.result = encrypted;
9,255✔
974
    opts.unitLen = 16;
9,255✔
975
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
9,255✔
976
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
9,255✔
977

978
    int32_t count = CBC_Encrypt(&opts);
9,255✔
979
    if (count != opts.len) {
9,255✔
980
      code = terrno ? terrno : TSDB_CODE_FAILED;
×
981
      dError("failed to encrypt verification for %s, count=%d, expected=%d, since %s", keyNames[i], count, opts.len,
×
982
             tstrerror(code));
983
      taosMemoryFree(buffer);
×
984
      return code;
×
985
    }
986

987
    // Write to buffer: algorithm + encrypted length + encrypted data
988
    memcpy(ptr, &algorithms[i], sizeof(int32_t));
9,255✔
989
    ptr += sizeof(int32_t);
9,255✔
990
    memcpy(ptr, &encryptedLen, sizeof(int32_t));
9,255✔
991
    ptr += sizeof(int32_t);
9,255✔
992
    memcpy(ptr, encrypted, encryptedLen);
9,255✔
993
    ptr += encryptedLen;
9,255✔
994

995
    dDebug("prepared verification for %s: algorithm=%d, encLen=%d", keyNames[i], algorithms[i], encryptedLen);
9,255✔
996
  }
997

998
  // Write all data to file in one operation
999
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
1,851✔
1000
  if (pFile == NULL) {
1,851✔
1001
    dError("failed to create key verification file:%s, errno:%d", verifyFile, errno);
×
1002
    taosMemoryFree(buffer);
×
1003
    return TSDB_CODE_FILE_CORRUPTED;
×
1004
  }
1005

1006
  int64_t written = taosWriteFile(pFile, buffer, totalSize);
1,851✔
1007
  (void)taosCloseFile(&pFile);
1,851✔
1008
  taosMemoryFree(buffer);
1,851✔
1009

1010
  if (written != totalSize) {
1,851✔
1011
    dError("failed to write key verification file, written=%" PRId64 ", expected=%d", written, totalSize);
×
1012
    return TSDB_CODE_FILE_CORRUPTED;
×
1013
  }
1014

1015
  dInfo("successfully saved key verification file:%s, size=%d", verifyFile, totalSize);
1,851✔
1016
  return 0;
1,851✔
1017
}
1018

1019
// Verify all encryption keys by decrypting and comparing with original plaintext
1020
static int32_t dmVerifyEncryptionKeys(const char *svrKey, const char *dbKey, const char *cfgKey, const char *metaKey,
1,585✔
1021
                                      const char *dataKey, int32_t algorithm, int32_t cfgAlgorithm,
1022
                                      int32_t metaAlgorithm) {
1023
  char    verifyFile[PATH_MAX] = {0};
1,585✔
1024
  int32_t nBytes = snprintf(verifyFile, sizeof(verifyFile), "%s%sdnode%sconfig%skey_verify.dat", tsDataDir, TD_DIRSEP,
1,585✔
1025
                            TD_DIRSEP, TD_DIRSEP);
1026
  if (nBytes <= 0 || nBytes >= sizeof(verifyFile)) {
1,585✔
1027
    dError("failed to build key verification file path");
×
1028
    return TSDB_CODE_OUT_OF_BUFFER;
×
1029
  }
1030

1031
  // Get file size
1032
  int64_t fileSize = 0;
1,585✔
1033
  if (taosStatFile(verifyFile, &fileSize, NULL, NULL) < 0) {
1,585✔
1034
    // File doesn't exist, create it with current keys
1035
    dInfo("key verification file not found, creating new one");
1,585✔
1036
    return dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
1,585✔
1037
  }
1038

1039
  if (fileSize <= 0 || fileSize > 10240) {  // Max 10KB
×
1040
    dError("invalid key verification file size: %" PRId64, fileSize);
×
1041
    return TSDB_CODE_FILE_CORRUPTED;
×
1042
  }
1043

1044
  // Allocate buffer and read entire file
1045
  char *buffer = taosMemoryMalloc(fileSize);
×
1046
  if (buffer == NULL) {
×
1047
    dError("failed to allocate memory for key verification buffer");
×
1048
    return terrno;
×
1049
  }
1050

1051
  TdFilePtr pFile = taosOpenFile(verifyFile, TD_FILE_READ);
×
1052
  if (pFile == NULL) {
×
1053
    dError("failed to open key verification file:%s", verifyFile);
×
1054
    taosMemoryFree(buffer);
×
1055
    return TSDB_CODE_FILE_CORRUPTED;
×
1056
  }
1057

1058
  int64_t bytesRead = taosReadFile(pFile, buffer, fileSize);
×
1059
  (void)taosCloseFile(&pFile);
×
1060

1061
  if (bytesRead != fileSize) {
×
1062
    dError("failed to read key verification file, read=%" PRId64 ", expected=%" PRId64, bytesRead, fileSize);
×
1063
    taosMemoryFree(buffer);
×
1064
    return TSDB_CODE_FILE_CORRUPTED;
×
1065
  }
1066

1067
  int32_t     code = 0;
×
1068
  const char *plaintext = KEY_VERIFY_PLAINTEXT;
×
1069
  int32_t     plaintextLen = strlen(plaintext);
×
1070
  const char *ptr = buffer;
×
1071

1072
  // Parse and verify header
1073
  uint32_t magic = 0;
×
1074
  uint16_t version = 0;
×
1075
  memcpy(&magic, ptr, sizeof(magic));
×
1076
  ptr += sizeof(magic);
×
1077
  memcpy(&version, ptr, sizeof(version));
×
1078
  ptr += sizeof(version);
×
1079

1080
  if (magic != 0x544B5659) {
×
1081
    dError("invalid magic number in key verification file: 0x%x", magic);
×
1082
    taosMemoryFree(buffer);
×
1083
    return TSDB_CODE_FILE_CORRUPTED;
×
1084
  }
1085

1086
  // Array of keys and their algorithms
1087
  const char *keys[] = {svrKey, dbKey, cfgKey, metaKey, dataKey};
×
1088
  int32_t     expectedAlgos[] = {algorithm, algorithm, cfgAlgorithm, metaAlgorithm, algorithm};
×
1089
  const char *keyNames[] = {"SVR_KEY", "DB_KEY", "CFG_KEY", "META_KEY", "DATA_KEY"};
×
1090

1091
  // Verify each key from buffer
1092
  for (int i = 0; i < 5; i++) {
×
1093
    // Check if we have enough data remaining
1094
    if (ptr - buffer + sizeof(int32_t) * 2 > fileSize) {
×
1095
      dError("unexpected end of file while reading %s metadata", keyNames[i]);
×
1096
      taosMemoryFree(buffer);
×
1097
      return TSDB_CODE_FILE_CORRUPTED;
×
1098
    }
1099

1100
    int32_t savedAlgo = 0;
×
1101
    int32_t encryptedLen = 0;
×
1102

1103
    memcpy(&savedAlgo, ptr, sizeof(int32_t));
×
1104
    ptr += sizeof(int32_t);
×
1105
    memcpy(&encryptedLen, ptr, sizeof(int32_t));
×
1106
    ptr += sizeof(int32_t);
×
1107

1108
    if (encryptedLen <= 0 || encryptedLen > 512) {
×
1109
      dError("invalid encrypted length %d for %s", encryptedLen, keyNames[i]);
×
1110
      taosMemoryFree(buffer);
×
1111
      return TSDB_CODE_FILE_CORRUPTED;
×
1112
    }
1113

1114
    if (ptr - buffer + encryptedLen > fileSize) {
×
1115
      dError("unexpected end of file while reading %s encrypted data", keyNames[i]);
×
1116
      taosMemoryFree(buffer);
×
1117
      return TSDB_CODE_FILE_CORRUPTED;
×
1118
    }
1119

1120
    uint8_t encrypted[512] = {0};
×
1121
    memcpy(encrypted, ptr, encryptedLen);
×
1122
    ptr += encryptedLen;
×
1123

1124
    // Decrypt with current key using CBC
1125
    char decrypted[512] = {0};
×
1126

1127
    SCryptOpts opts = {0};
×
1128
    opts.len = encryptedLen;
×
1129
    opts.source = (char *)encrypted;
×
1130
    opts.result = decrypted;
×
1131
    opts.unitLen = 16;
×
1132
    opts.pOsslAlgrName = TSDB_ENCRYPT_ALGO_SM4_STR;
×
1133
    tstrncpy(opts.key, keys[i], sizeof(opts.key));
×
1134

1135
    int32_t count = CBC_Decrypt(&opts);
×
1136
    if (count != opts.len) {
×
1137
      code = terrno ? terrno : TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1138
      dError("failed to decrypt verification for %s, count=%d, expected=%d, since %s - KEY IS INCORRECT", keyNames[i],
×
1139
             count, opts.len, tstrerror(code));
1140
      taosMemoryFree(buffer);
×
1141
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1142
    }
1143

1144
    // Verify decrypted data matches original plaintext (compare only the plaintext length)
1145
    if (memcmp(decrypted, plaintext, plaintextLen) != 0) {
×
1146
      dError("%s verification FAILED: decrypted text does not match - KEY IS INCORRECT", keyNames[i]);
×
1147
      taosMemoryFree(buffer);
×
1148
      return TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
1149
    }
1150

1151
    dInfo("%s verification passed (algorithm=%d)", keyNames[i], savedAlgo);
×
1152
  }
1153

1154
  taosMemoryFree(buffer);
×
1155
  dInfo("all encryption keys verified successfully");
×
1156
  return 0;
×
1157
}
1158

1159
// Public API: Verify and initialize encryption keys at startup
1160
int32_t dmVerifyAndInitEncryptionKeys(void) {
631,654✔
1161
  // Skip verification in dump sdb mode (taosd -s)
1162
  if (tsSkipKeyCheckMode) {
631,654✔
1163
    dInfo("skip encryption key verification in some special check mode");
1,560✔
1164
    return 0;
1,560✔
1165
  }
1166

1167
  // Check if encryption keys are loaded
1168
  if (tsEncryptKeysStatus != TSDB_ENCRYPT_KEY_STAT_LOADED) {
630,094✔
1169
    dDebug("encryption keys not loaded, skipping verification");
628,509✔
1170
    return 0;
628,509✔
1171
  }
1172

1173
  // Get key file paths
1174
  char    masterKeyFile[PATH_MAX] = {0};
1,585✔
1175
  char    derivedKeyFile[PATH_MAX] = {0};
1,585✔
1176
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
1,585✔
1177
                            TD_DIRSEP, TD_DIRSEP);
1178
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
1,585✔
1179
    dError("failed to build master key file path");
×
1180
    return TSDB_CODE_OUT_OF_BUFFER;
×
1181
  }
1182

1183
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
1,585✔
1184
                    TD_DIRSEP, TD_DIRSEP);
1185
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
1,585✔
1186
    dError("failed to build derived key file path");
×
1187
    return TSDB_CODE_OUT_OF_BUFFER;
×
1188
  }
1189

1190
  // Load encryption keys
1191
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
1,585✔
1192
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
1,585✔
1193
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
1,585✔
1194
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
1,585✔
1195
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
1,585✔
1196
  int32_t algorithm = 0;
1,585✔
1197
  int32_t cfgAlgorithm = 0;
1,585✔
1198
  int32_t metaAlgorithm = 0;
1,585✔
1199
  int32_t fileVersion = 0;
1,585✔
1200
  int32_t keyVersion = 0;
1,585✔
1201
  int64_t createTime = 0;
1,585✔
1202
  int64_t svrKeyUpdateTime = 0;
1,585✔
1203
  int64_t dbKeyUpdateTime = 0;
1,585✔
1204

1205
  int32_t code = taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey,
1,585✔
1206
                                      &algorithm, &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime,
1207
                                      &svrKeyUpdateTime, &dbKeyUpdateTime);
1208
  if (code != 0) {
1,585✔
1209
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1210
    return code;
×
1211
  }
1212

1213
  // Verify all keys
1214
  code = dmVerifyEncryptionKeys(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, cfgAlgorithm, metaAlgorithm);
1,585✔
1215
  if (code != 0) {
1,585✔
1216
    dError("encryption key verification failed, since %s", tstrerror(code));
×
1217
    return code;
×
1218
  }
1219

1220
  dInfo("encryption keys verified and initialized successfully");
1,585✔
1221
  return 0;
1,585✔
1222
}
1223
#else
1224
int32_t dmVerifyAndInitEncryptionKeys(void) {
1225
  // Community edition or no TaosK support
1226
  return 0;
1227
}
1228
#endif
1229

1230
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1231
static int32_t dmUpdateSvrKey(const char *newKey) {
133✔
1232
  if (newKey == NULL || newKey[0] == '\0') {
133✔
1233
    dError("invalid new SVR_KEY, key is empty");
×
1234
    return TSDB_CODE_INVALID_PARA;
×
1235
  }
1236

1237
  char masterKeyFile[PATH_MAX] = {0};
133✔
1238
  char derivedKeyFile[PATH_MAX] = {0};
133✔
1239

1240
  // Build path to key files
1241
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
133✔
1242
                            TD_DIRSEP, TD_DIRSEP);
1243
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
133✔
1244
    dError("failed to build master key file path");
×
1245
    return TSDB_CODE_OUT_OF_BUFFER;
×
1246
  }
1247

1248
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
133✔
1249
                    TD_DIRSEP, TD_DIRSEP);
1250
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
133✔
1251
    dError("failed to build derived key file path");
×
1252
    return TSDB_CODE_OUT_OF_BUFFER;
×
1253
  }
1254

1255
  // Load current keys
1256
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1257
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1258
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1259
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1260
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1261
  int32_t algorithm = 0;
133✔
1262
  int32_t cfgAlgorithm = 0;
133✔
1263
  int32_t metaAlgorithm = 0;
133✔
1264
  int32_t fileVersion = 0;
133✔
1265
  int32_t keyVersion = 0;
133✔
1266
  int64_t createTime = 0;
133✔
1267
  int64_t svrKeyUpdateTime = 0;
133✔
1268
  int64_t dbKeyUpdateTime = 0;
133✔
1269

1270
  int32_t code =
1271
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
133✔
1272
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1273
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1274
  if (code != 0) {
133✔
1275
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1276
    return code;
×
1277
  }
1278

1279
  // Update SVR_KEY
1280
  int64_t now = taosGetTimestampMs();
133✔
1281
  int32_t newKeyVersion = keyVersion + 1;
133✔
1282

1283
  dInfo("updating SVR_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
133✔
1284
  tstrncpy(svrKey, newKey, sizeof(svrKey));
133✔
1285
  svrKeyUpdateTime = now;
133✔
1286

1287
  // Save updated keys (use algorithm for all keys for backward compatibility)
1288
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
133✔
1289
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1290
  if (code != 0) {
133✔
1291
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1292
    return code;
×
1293
  }
1294

1295
  // Update key verification file with new SVR_KEY
1296
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
133✔
1297
  if (code != 0) {
133✔
1298
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1299
    // Don't fail the operation if verification file update fails
1300
  }
1301

1302
  // Update global variables
1303
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
133✔
1304
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
133✔
1305
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
133✔
1306
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
133✔
1307
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
133✔
1308
  tsEncryptAlgorithmType = algorithm;
133✔
1309
  tsEncryptFileVersion = fileVersion;
133✔
1310
  tsEncryptKeyVersion = newKeyVersion;
133✔
1311
  tsEncryptKeyCreateTime = createTime;
133✔
1312
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
133✔
1313
  tsDbKeyUpdateTime = dbKeyUpdateTime;
133✔
1314

1315
  // Update encryption key status for backward compatibility
1316
  int keyLen = strlen(tsDataKey);
133✔
1317
  if (keyLen > ENCRYPT_KEY_LEN) {
133✔
1318
    keyLen = ENCRYPT_KEY_LEN;
×
1319
  }
1320
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
133✔
1321
  memcpy(tsEncryptKey, tsDataKey, keyLen);
133✔
1322
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
133✔
1323
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
133✔
1324

1325
  dInfo("successfully updated SVR_KEY to version:%d", newKeyVersion);
133✔
1326
  return 0;
133✔
1327
}
1328

1329
static int32_t dmUpdateKeyExpiration(int32_t days, const char *strategy) {
×
1330
  if (days < 0) {
×
1331
    dError("invalid days value:%d, must be >= 0", days);
×
1332
    return TSDB_CODE_INVALID_PARA;
×
1333
  }
1334

1335
  if (strategy == NULL || strategy[0] == '\0') {
×
1336
    dError("invalid strategy, strategy is empty");
×
1337
    return TSDB_CODE_INVALID_PARA;
×
1338
  }
1339

1340
  // Validate strategy value
1341
  if (strcmp(strategy, "ALARM") != 0) {
×
1342
    dWarn("unknown strategy:%s, supported values: ALARM. Will use it anyway.", strategy);
×
1343
  }
1344

1345
  // Update global variables directly
1346
  tsKeyExpirationDays = days;
×
1347
  tstrncpy(tsKeyExpirationStrategy, strategy, sizeof(tsKeyExpirationStrategy));
×
1348

1349
  dInfo("successfully updated key expiration config: days=%d, strategy=%s", days, strategy);
×
1350
  return 0;
×
1351
}
1352

1353
static int32_t dmUpdateDbKey(const char *newKey) {
133✔
1354
  if (newKey == NULL || newKey[0] == '\0') {
133✔
1355
    dError("invalid new DB_KEY, key is empty");
×
1356
    return TSDB_CODE_INVALID_PARA;
×
1357
  }
1358

1359
  char masterKeyFile[PATH_MAX] = {0};
133✔
1360
  char derivedKeyFile[PATH_MAX] = {0};
133✔
1361

1362
  // Build path to key files
1363
  int32_t nBytes = snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
133✔
1364
                            TD_DIRSEP, TD_DIRSEP);
1365
  if (nBytes <= 0 || nBytes >= sizeof(masterKeyFile)) {
133✔
1366
    dError("failed to build master key file path");
×
1367
    return TSDB_CODE_OUT_OF_BUFFER;
×
1368
  }
1369

1370
  nBytes = snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
133✔
1371
                    TD_DIRSEP, TD_DIRSEP);
1372
  if (nBytes <= 0 || nBytes >= sizeof(derivedKeyFile)) {
133✔
1373
    dError("failed to build derived key file path");
×
1374
    return TSDB_CODE_OUT_OF_BUFFER;
×
1375
  }
1376

1377
  // Load current keys
1378
  char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1379
  char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1380
  char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1381
  char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1382
  char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
133✔
1383
  int32_t algorithm = 0;
133✔
1384
  int32_t cfgAlgorithm = 0;
133✔
1385
  int32_t metaAlgorithm = 0;
133✔
1386
  int32_t fileVersion = 0;
133✔
1387
  int32_t keyVersion = 0;
133✔
1388
  int64_t createTime = 0;
133✔
1389
  int64_t svrKeyUpdateTime = 0;
133✔
1390
  int64_t dbKeyUpdateTime = 0;
133✔
1391

1392
  int32_t code =
1393
      taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
133✔
1394
                           &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
1395
                           &svrKeyUpdateTime, &dbKeyUpdateTime);
1396
  if (code != 0) {
133✔
1397
    dError("failed to load encryption keys, since %s", tstrerror(code));
×
1398
    return code;
×
1399
  }
1400

1401
  // Update DB_KEY
1402
  int64_t now = taosGetTimestampMs();
133✔
1403
  int32_t newKeyVersion = keyVersion + 1;
133✔
1404

1405
  dInfo("updating DB_KEY, old version:%d, new version:%d", keyVersion, newKeyVersion);
133✔
1406
  tstrncpy(dbKey, newKey, sizeof(dbKey));
133✔
1407
  dbKeyUpdateTime = now;
133✔
1408

1409
  // Save updated keys (use algorithm for all keys for backward compatibility)
1410
  code = taoskSaveEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm,
133✔
1411
                              algorithm, algorithm, newKeyVersion, createTime, svrKeyUpdateTime, dbKeyUpdateTime);
1412
  if (code != 0) {
133✔
1413
    dError("failed to save updated encryption keys, since %s", tstrerror(code));
×
1414
    return code;
×
1415
  }
1416

1417
  // Update key verification file with new DB_KEY
1418
  code = dmSaveKeyVerification(svrKey, dbKey, cfgKey, metaKey, dataKey, algorithm, algorithm, algorithm);
133✔
1419
  if (code != 0) {
133✔
1420
    dWarn("failed to update key verification file, since %s", tstrerror(code));
×
1421
    // Don't fail the operation if verification file update fails
1422
  }
1423

1424
  // Update global variables
1425
  tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
133✔
1426
  tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
133✔
1427
  tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
133✔
1428
  tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
133✔
1429
  tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
133✔
1430
  tsEncryptAlgorithmType = algorithm;
133✔
1431
  tsEncryptFileVersion = fileVersion;
133✔
1432
  tsEncryptKeyVersion = newKeyVersion;
133✔
1433
  tsEncryptKeyCreateTime = createTime;
133✔
1434
  tsSvrKeyUpdateTime = svrKeyUpdateTime;
133✔
1435
  tsDbKeyUpdateTime = dbKeyUpdateTime;
133✔
1436

1437
  // Update encryption key status for backward compatibility
1438
  int keyLen = strlen(tsDataKey);
133✔
1439
  if (keyLen > ENCRYPT_KEY_LEN) {
133✔
1440
    keyLen = ENCRYPT_KEY_LEN;
×
1441
  }
1442
  memset(tsEncryptKey, 0, ENCRYPT_KEY_LEN + 1);
133✔
1443
  memcpy(tsEncryptKey, tsDataKey, keyLen);
133✔
1444
  tsEncryptKey[ENCRYPT_KEY_LEN] = '\0';
133✔
1445
  tsEncryptionKeyChksum = taosCalcChecksum(0, (const uint8_t *)tsEncryptKey, strlen(tsEncryptKey));
133✔
1446

1447
  dInfo("successfully updated DB_KEY to version:%d", newKeyVersion);
133✔
1448
  return 0;
133✔
1449
}
1450
#endif
1451

1452
int32_t dmProcessAlterEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
266✔
1453
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1454
  int32_t              code = 0;
266✔
1455
  SMAlterEncryptKeyReq alterKeyReq = {0};
266✔
1456
  if (tDeserializeSMAlterEncryptKeyReq(pMsg->pCont, pMsg->contLen, &alterKeyReq) != 0) {
266✔
1457
    code = TSDB_CODE_INVALID_MSG;
×
1458
    dError("failed to deserialize alter encrypt key req, since %s", tstrerror(code));
×
1459
    goto _exit;
×
1460
  }
1461

1462
  dInfo("received alter encrypt key req, keyType:%d", alterKeyReq.keyType);
266✔
1463

1464
  // Update the specified key (svr_key or db_key)
1465
  if (alterKeyReq.keyType == 0) {
266✔
1466
    // Update SVR_KEY
1467
    code = dmUpdateSvrKey(alterKeyReq.newKey);
133✔
1468
    if (code == 0) {
133✔
1469
      dInfo("successfully updated SVR_KEY");
133✔
1470
    } else {
1471
      dError("failed to update SVR_KEY, since %s", tstrerror(code));
×
1472
    }
1473
  } else if (alterKeyReq.keyType == 1) {
133✔
1474
    // Update DB_KEY
1475
    code = dmUpdateDbKey(alterKeyReq.newKey);
133✔
1476
    if (code == 0) {
133✔
1477
      dInfo("successfully updated DB_KEY");
133✔
1478
    } else {
1479
      dError("failed to update DB_KEY, since %s", tstrerror(code));
×
1480
    }
1481
  } else {
1482
    dError("invalid keyType:%d, must be 0 (SVR_KEY) or 1 (DB_KEY)", alterKeyReq.keyType);
×
1483
    code = TSDB_CODE_INVALID_PARA;
×
1484
  }
1485

1486
_exit:
266✔
1487
  tFreeSMAlterEncryptKeyReq(&alterKeyReq);
266✔
1488
  pMsg->code = code;
266✔
1489
  pMsg->info.rsp = NULL;
266✔
1490
  pMsg->info.rspLen = 0;
266✔
1491
  return code;
266✔
1492
#else
1493
  dError("encryption key management is only available in enterprise edition");
1494
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1495
  pMsg->info.rsp = NULL;
1496
  pMsg->info.rspLen = 0;
1497
  return TSDB_CODE_OPS_NOT_SUPPORT;
1498
#endif
1499
}
1500

1501
int32_t dmProcessAlterKeyExpirationReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1502
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
1503
  int32_t                 code = 0;
×
1504
  SMAlterKeyExpirationReq alterReq = {0};
×
1505
  if (tDeserializeSMAlterKeyExpirationReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
×
1506
    code = TSDB_CODE_INVALID_MSG;
×
1507
    dError("failed to deserialize alter key expiration req, since %s", tstrerror(code));
×
1508
    goto _exit;
×
1509
  }
1510

1511
  dInfo("received alter key expiration req, days:%d, strategy:%s", alterReq.days, alterReq.strategy);
×
1512

1513
  // Update key expiration configuration
1514
  code = dmUpdateKeyExpiration(alterReq.days, alterReq.strategy);
×
1515
  if (code == 0) {
×
1516
    dInfo("successfully updated key expiration: %d days, strategy: %s", alterReq.days, alterReq.strategy);
×
1517
  } else {
1518
    dError("failed to update key expiration, since %s", tstrerror(code));
×
1519
  }
1520

1521
_exit:
×
1522
  tFreeSMAlterKeyExpirationReq(&alterReq);
×
1523
  pMsg->code = code;
×
1524
  pMsg->info.rsp = NULL;
×
1525
  pMsg->info.rspLen = 0;
×
1526
  return code;
×
1527
#else
1528
  dError("key expiration management is only available in enterprise edition");
1529
  pMsg->code = TSDB_CODE_OPS_NOT_SUPPORT;
1530
  pMsg->info.rsp = NULL;
1531
  pMsg->info.rspLen = 0;
1532
  return TSDB_CODE_OPS_NOT_SUPPORT;
1533
#endif
1534
}
1535

1536
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1537
  int32_t code = 0;
×
1538
  int32_t lino = 0;
×
1539
  SMsgCb *msgCb = &pMgmt->msgCb;
×
1540
  void *pTransCli = msgCb->clientRpc;
×
1541
  void *pTransStatus = msgCb->statusRpc;  
×
1542
  void *pTransSync = msgCb->syncRpc; 
×
1543
  void *pTransServer = msgCb->serverRpc;
×
1544

1545
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
1546
  if (code != 0) {
×
1547
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
1548
    goto _error;
×
1549
  }
1550

1551
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
1552
  if (code != 0) {
×
1553
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
1554
    goto _error;
×
1555
  }
1556

1557
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
1558
  if (code != 0) {
×
1559
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
1560
    goto _error;
×
1561
  }
1562

1563
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
1564
  if (code != 0) {
×
1565
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
1566
    goto _error;
×
1567
  }
1568

1569
_error:
×
1570
  
1571
  return code;
×
1572
}
1573

1574
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
168✔
1575
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
168✔
1576
  pStatus->details[0] = 0;
168✔
1577

1578
  SMonMloadInfo minfo = {0};
168✔
1579
  (*pMgmt->getMnodeLoadsFp)(&minfo);
168✔
1580
  if (minfo.isMnode &&
168✔
1581
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
168✔
1582
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1583
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
1584
    return;
×
1585
  }
1586

1587
  SMonVloadInfo vinfo = {0};
168✔
1588
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
168✔
1589
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
504✔
1590
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
336✔
1591
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
336✔
1592
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
1593
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
1594
               syncStr(pLoad->syncState));
×
1595
      break;
×
1596
    }
1597
  }
1598

1599
  taosArrayDestroy(vinfo.pVloads);
168✔
1600
}
1601

1602
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
168✔
1603
  int32_t code = 0;
168✔
1604
  dDebug("server run status req is received");
168✔
1605
  SServerStatusRsp statusRsp = {0};
168✔
1606
  dmGetServerRunStatus(pMgmt, &statusRsp);
168✔
1607

1608
  pMsg->info.rsp = NULL;
168✔
1609
  pMsg->info.rspLen = 0;
168✔
1610

1611
  SRpcMsg rspMsg = {.info = pMsg->info};
168✔
1612
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
168✔
1613
  if (rspLen < 0) {
168✔
1614
    return TSDB_CODE_OUT_OF_MEMORY;
×
1615
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1616
    // return rspMsg.code;
1617
  }
1618

1619
  void *pRsp = rpcMallocCont(rspLen);
168✔
1620
  if (pRsp == NULL) {
168✔
1621
    return terrno;
×
1622
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
1623
    // return rspMsg.code;
1624
  }
1625

1626
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
168✔
1627
  if (rspLen < 0) {
168✔
1628
    return TSDB_CODE_INVALID_MSG;
×
1629
  }
1630

1631
  pMsg->info.rsp = pRsp;
168✔
1632
  pMsg->info.rspLen = rspLen;
168✔
1633
  return 0;
168✔
1634
}
1635

1636
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
24,926✔
1637
  int32_t code = 0;
24,926✔
1638

1639
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
24,926✔
1640
  if (pBlock == NULL) {
24,926✔
1641
    return terrno;
×
1642
  }
1643

1644
  size_t size = 0;
24,926✔
1645

1646
  const SSysTableMeta *pMeta = NULL;
24,926✔
1647
  getInfosDbMeta(&pMeta, &size);
24,926✔
1648

1649
  int32_t index = 0;
24,926✔
1650
  for (int32_t i = 0; i < size; ++i) {
498,520✔
1651
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
498,520✔
1652
      index = i;
24,926✔
1653
      break;
24,926✔
1654
    }
1655
  }
1656

1657
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
24,926✔
1658
  if (pBlock->pDataBlock == NULL) {
24,926✔
1659
    code = terrno;
×
1660
    goto _exit;
×
1661
  }
1662

1663
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
174,482✔
1664
    SColumnInfoData colInfoData = {0};
149,556✔
1665
    colInfoData.info.colId = i + 1;
149,556✔
1666
    colInfoData.info.type = pMeta[index].schema[i].type;
149,556✔
1667
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
149,556✔
1668
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
299,112✔
1669
      code = terrno;
×
1670
      goto _exit;
×
1671
    }
1672
  }
1673

1674
  pBlock->info.hasVarCol = true;
24,926✔
1675
_exit:
24,926✔
1676
  if (code != 0) {
24,926✔
1677
    blockDataDestroy(pBlock);
×
1678
  } else {
1679
    *ppBlock = pBlock;
24,926✔
1680
  }
1681
  return code;
24,926✔
1682
}
1683

1684
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
24,926✔
1685
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
24,926✔
1686
  if (code != 0) {
24,926✔
1687
    return code;
×
1688
  }
1689

1690
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
24,926✔
1691
  if (pColInfo == NULL) {
24,926✔
1692
    return TSDB_CODE_OUT_OF_RANGE;
×
1693
  }
1694

1695
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
24,926✔
1696
}
1697

1698
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
24,926✔
1699
  int32_t           size = 0;
24,926✔
1700
  int32_t           rowsRead = 0;
24,926✔
1701
  int32_t           code = 0;
24,926✔
1702
  SRetrieveTableReq retrieveReq = {0};
24,926✔
1703
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
24,926✔
1704
    return TSDB_CODE_INVALID_MSG;
×
1705
  }
1706
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
24,926✔
1707
#if 0
1708
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
1709
    code = TSDB_CODE_MND_NO_RIGHTS;
1710
    return code;
1711
  }
1712
#endif
1713
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
24,926✔
1714
    return TSDB_CODE_INVALID_MSG;
×
1715
  }
1716

1717
  SSDataBlock *pBlock = NULL;
24,926✔
1718
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
24,926✔
1719
    return code;
×
1720
  }
1721

1722
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
24,926✔
1723
  if (code != 0) {
24,926✔
1724
    blockDataDestroy(pBlock);
×
1725
    return code;
×
1726
  }
1727

1728
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
24,926✔
1729
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
24,926✔
1730
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
24,926✔
1731

1732
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
24,926✔
1733
  if (pRsp == NULL) {
24,926✔
1734
    code = terrno;
×
1735
    dError("failed to retrieve data since %s", tstrerror(code));
×
1736
    blockDataDestroy(pBlock);
×
1737
    return code;
×
1738
  }
1739

1740
  char *pStart = pRsp->data;
24,926✔
1741
  *(int32_t *)pStart = htonl(numOfCols);
24,926✔
1742
  pStart += sizeof(int32_t);  // number of columns
24,926✔
1743

1744
  for (int32_t i = 0; i < numOfCols; ++i) {
174,482✔
1745
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
149,556✔
1746
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
149,556✔
1747

1748
    pSchema->bytes = htonl(pColInfo->info.bytes);
149,556✔
1749
    pSchema->colId = htons(pColInfo->info.colId);
149,556✔
1750
    pSchema->type = pColInfo->info.type;
149,556✔
1751

1752
    pStart += sizeof(SSysTableSchema);
149,556✔
1753
  }
1754

1755
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
24,926✔
1756
  if (len < 0) {
24,926✔
1757
    dError("failed to retrieve data since %s", tstrerror(code));
×
1758
    blockDataDestroy(pBlock);
×
1759
    rpcFreeCont(pRsp);
×
1760
    return terrno;
×
1761
  }
1762

1763
  pRsp->numOfRows = htonl(pBlock->info.rows);
24,926✔
1764
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
24,926✔
1765
  pRsp->completed = 1;
24,926✔
1766
  pMsg->info.rsp = pRsp;
24,926✔
1767
  pMsg->info.rspLen = size;
24,926✔
1768
  dDebug("dnode variables retrieve completed");
24,926✔
1769

1770
  blockDataDestroy(pBlock);
24,926✔
1771
  return TSDB_CODE_SUCCESS;
24,926✔
1772
}
1773

1774
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
17,216,498✔
1775
  SMStreamHbRspMsg rsp = {0};
17,216,498✔
1776
  int32_t          code = 0;
17,216,498✔
1777
  SDecoder         decoder;
17,212,579✔
1778
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
17,216,498✔
1779
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
17,216,498✔
1780
  int64_t          currTs = taosGetTimestampMs();
17,216,498✔
1781

1782
  if (pMsg->code) {
17,216,498✔
1783
    return streamHbHandleRspErr(pMsg->code, currTs);
163,524✔
1784
  }
1785

1786
  tDecoderInit(&decoder, (uint8_t*)msg, len);
17,052,974✔
1787
  code = tDecodeStreamHbRsp(&decoder, &rsp);
17,052,974✔
1788
  if (code < 0) {
17,052,974✔
1789
    code = TSDB_CODE_INVALID_MSG;
×
1790
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
1791
    tDecoderClear(&decoder);
×
1792
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
1793
    return streamHbHandleRspErr(code, currTs);
×
1794
  }
1795

1796
  tDecoderClear(&decoder);
17,052,974✔
1797

1798
  return streamHbProcessRspMsg(&rsp);
17,052,974✔
1799
}
1800

1801

1802
SArray *dmGetMsgHandles() {
631,749✔
1803
  int32_t code = -1;
631,749✔
1804
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
631,749✔
1805
  if (pArray == NULL) {
631,749✔
1806
    return NULL;
×
1807
  }
1808

1809
  // Requests handled by DNODE
1810
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1811
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1812
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1813
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1814
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1815
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1816
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1817
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1818
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1819
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1820
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1821
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1822
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1823
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1824
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1825
  if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_KEY_EXPIRATION, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1826
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1827
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1828

1829
  // Requests handled by MNODE
1830
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1831
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1832
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
631,749✔
1833

1834
  code = 0;
631,749✔
1835

1836
_OVER:
631,749✔
1837
  if (code != 0) {
631,749✔
1838
    taosArrayDestroy(pArray);
×
1839
    return NULL;
×
1840
  } else {
1841
    return pArray;
631,749✔
1842
  }
1843
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc