• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4891

20 Dec 2025 03:05AM UTC coverage: 65.549% (+2.7%) from 62.824%
#4891

push

travis-ci

web-flow
merge: from main to 3.0 branch #33992

182743 of 278788 relevant lines covered (65.55%)

104071702.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.56
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "monitor.h"
21
#include "stream.h"
22
#include "systable.h"
23
#include "tanalytics.h"
24
#include "tchecksum.h"
25
#include "tutil.h"
26

27
extern SConfig *tsCfg;
28
extern void setAuditDbNameToken(char *pDb, char *pToken);
29

30
#ifndef TD_ENTERPRISE
31
void setAuditDbNameToken(char *pDb, char *pToken) {}
32
#endif
33

34
extern void getAuditDbNameToken(char *pDb, char *pToken);
35

36
#ifndef TD_ENTERPRISE
37
void getAuditDbNameToken(char *pDb, char *pToken) {}
38
#endif
39

40
SMonVloadInfo tsVinfo = {0};
41
SMnodeLoad    tsMLoad = {0};
42
SDnodeData    tsDnodeData = {0};
43

44
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
2,618,639✔
45
  int32_t code = 0;
2,618,639✔
46
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
2,618,639✔
47
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
535,073✔
48
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
535,073✔
49
    pMgmt->pData->dnodeId = pCfg->dnodeId;
535,073✔
50
    pMgmt->pData->clusterId = pCfg->clusterId;
535,073✔
51
    monSetDnodeId(pCfg->dnodeId);
535,073✔
52
    auditSetDnodeId(pCfg->dnodeId);
535,073✔
53
    code = dmWriteEps(pMgmt->pData);
535,073✔
54
    if (code != 0) {
535,073✔
55
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
3,466✔
56
            tstrerror(code));
57
    }
58
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
535,073✔
59
  }
60
}
2,618,639✔
61

62
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,852,421✔
63
  int32_t code = 0;
2,852,421✔
64
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,852,421✔
65
  if (pMgmt->pData->ipWhiteVer == ver) {
2,852,421✔
66
    if (ver == 0) {
2,850,536✔
67
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,847,330✔
68
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,847,330✔
69
        dError("failed to disable ip white list on dnode");
×
70
      }
71
    }
72
    return;
2,850,536✔
73
  }
74
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
1,885✔
75

76
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,885✔
77
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,885✔
78
  if (contLen < 0) {
1,885✔
79
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
80
    return;
×
81
  }
82
  void *pHead = rpcMallocCont(contLen);
1,885✔
83
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,885✔
84
  if (contLen < 0) {
1,885✔
85
    rpcFreeCont(pHead);
×
86
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
87
    return;
×
88
  }
89

90
  SRpcMsg rpcMsg = {.pCont = pHead,
1,885✔
91
                    .contLen = contLen,
92
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
93
                    .info.ahandle = 0,
94
                    .info.notFreeAhandle = 1,
95
                    .info.refId = 0,
96
                    .info.noResp = 0,
97
                    .info.handle = 0};
98
  SEpSet  epset = {0};
1,885✔
99

100
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,885✔
101

102
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,885✔
103
  if (code != 0) {
1,885✔
104
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
105
  }
106
}
107

108

109

110
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,852,421✔
111
  int32_t code = 0;
2,852,421✔
112
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,852,421✔
113
  if (pMgmt->pData->timeWhiteVer == ver) {
2,852,421✔
114
    if (ver == 0) {
2,850,536✔
115
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,847,330✔
116
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,847,330✔
117
        dError("failed to disable time white list on dnode");
×
118
      }
119
    }
120
    return;
2,850,536✔
121
  }
122
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
1,885✔
123

124
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,885✔
125
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,885✔
126
  if (contLen < 0) {
1,885✔
127
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
128
    return;
×
129
  }
130
  void *pHead = rpcMallocCont(contLen);
1,885✔
131
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,885✔
132
  if (contLen < 0) {
1,885✔
133
    rpcFreeCont(pHead);
×
134
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
135
    return;
×
136
  }
137

138
  SRpcMsg rpcMsg = {.pCont = pHead,
1,885✔
139
                    .contLen = contLen,
140
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
141
                    .info.ahandle = 0,
142
                    .info.notFreeAhandle = 1,
143
                    .info.refId = 0,
144
                    .info.noResp = 0,
145
                    .info.handle = 0};
146
  SEpSet  epset = {0};
1,885✔
147

148
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,885✔
149

150
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,885✔
151
  if (code != 0) {
1,885✔
152
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
153
  }
154
}
155

156

157

158
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,852,421✔
159
  int32_t code = 0;
2,852,421✔
160
  int64_t oldVer = taosAnalyGetVersion();
2,852,421✔
161
  if (oldVer == newVer) return;
2,852,421✔
162
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
163

164
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
165
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
166
  if (contLen < 0) {
×
167
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
168
    return;
×
169
  }
170

171
  void *pHead = rpcMallocCont(contLen);
×
172
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
173
  if (contLen < 0) {
×
174
    rpcFreeCont(pHead);
×
175
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
176
    return;
×
177
  }
178

179
  SRpcMsg rpcMsg = {
×
180
      .pCont = pHead,
181
      .contLen = contLen,
182
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
183
      .info.ahandle = 0,
184
      .info.refId = 0,
185
      .info.noResp = 0,
186
      .info.handle = 0,
187
  };
188
  SEpSet epset = {0};
×
189

190
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
191

192
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
193
  if (code != 0) {
×
194
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
195
  }
196
}
197

198
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
41,273,680✔
199
  const STraceId *trace = &pRsp->info.traceId;
41,273,680✔
200
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
41,273,680✔
201

202
  if (pRsp->code != 0) {
41,273,680✔
203
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
550,712✔
204
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
205
             pMgmt->statusSeq);
206
      pMgmt->pData->dropped = 1;
×
207
      if (dmWriteEps(pMgmt->pData) != 0) {
×
208
        dError("failed to write dnode file");
×
209
      }
210
      dInfo("dnode will exit since it is in the dropped state");
×
211
      (void)raise(SIGINT);
×
212
    }
213
  } else {
214
    SStatusRsp statusRsp = {0};
40,722,968✔
215
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
43,575,389✔
216
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,852,421✔
217
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,852,421✔
218
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
2,618,639✔
219
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
220
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
2,618,639✔
221
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
2,618,639✔
222
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
2,618,639✔
223
      }
224
      dGInfo("dnode:%d, set auditDB:%s, token:%s in status rsp received from mnode", pMgmt->pData->dnodeId,
2,852,421✔
225
             statusRsp.auditDB, statusRsp.auditToken);
226
      setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken);
2,852,421✔
227
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,852,421✔
228
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,852,421✔
229
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,852,421✔
230
    }
231
    tFreeSStatusRsp(&statusRsp);
40,722,968✔
232
  }
233
  rpcFreeCont(pRsp->pCont);
41,273,680✔
234
}
41,273,680✔
235

236
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
41,373,207✔
237
  int32_t    code = 0;
41,373,207✔
238
  SStatusReq req = {0};
41,373,207✔
239
  req.timestamp = taosGetTimestampMs();
41,373,207✔
240
  pMgmt->statusSeq++;
41,373,207✔
241

242
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
41,373,207✔
243
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
41,373,207✔
244
    dError("failed to lock status info lock");
×
245
    return;
×
246
  }
247

248
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
41,373,207✔
249
  req.sver = tsVersion;
41,373,207✔
250
  req.dnodeVer = tsDnodeData.dnodeVer;
41,373,207✔
251
  req.dnodeId = tsDnodeData.dnodeId;
41,373,207✔
252
  req.clusterId = tsDnodeData.clusterId;
41,373,207✔
253
  if (req.clusterId == 0) req.dnodeId = 0;
41,373,207✔
254
  req.rebootTime = tsDnodeData.rebootTime;
41,373,207✔
255
  req.updateTime = tsDnodeData.updateTime;
41,373,207✔
256
  req.numOfCores = tsNumOfCores;
41,373,207✔
257
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
41,373,207✔
258
  req.numOfDiskCfg = tsDiskCfgNum;
41,373,207✔
259
  req.memTotal = tsTotalMemoryKB * 1024;
41,373,207✔
260
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
41,373,207✔
261
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
41,373,207✔
262
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
41,373,207✔
263

264
  req.clusterCfg.statusInterval = tsStatusInterval;
41,373,207✔
265
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
41,373,207✔
266
  req.clusterCfg.checkTime = 0;
41,373,207✔
267
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
41,373,207✔
268
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
41,373,207✔
269
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
41,373,207✔
270
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
41,373,207✔
271
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
41,373,207✔
272
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
41,373,207✔
273
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
41,373,207✔
274
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
41,373,207✔
275
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
41,373,207✔
276
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
41,373,207✔
277
  char timestr[32] = "1970-01-01 00:00:00.00";
41,373,207✔
278
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
41,373,207✔
279
      0) {
280
    dError("failed to parse time since %s", tstrerror(code));
×
281
  }
282
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
41,373,207✔
283
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
41,373,207✔
284
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
41,373,207✔
285

286
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
41,373,207✔
287

288
  req.pVloads = tsVinfo.pVloads;
41,373,207✔
289
  tsVinfo.pVloads = NULL;
41,373,207✔
290

291
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
41,373,207✔
292
  req.mload = tsMLoad;
41,373,207✔
293

294
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
41,373,207✔
295
    dError("failed to unlock status info lock");
×
296
    return;
×
297
  }
298

299
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
41,373,207✔
300
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
41,373,207✔
301

302
  req.statusSeq = pMgmt->statusSeq;
41,373,207✔
303
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
41,373,207✔
304
  req.analVer = taosAnalyGetVersion();
41,373,207✔
305
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
41,373,207✔
306

307
  getAuditDbNameToken(req.auditDB, req.auditToken);
41,373,207✔
308

309
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
41,373,207✔
310
  if (contLen < 0) {
41,373,207✔
311
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
312
    return;
×
313
  }
314

315
  void *pHead = rpcMallocCont(contLen);
41,373,207✔
316
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
41,373,207✔
317
  if (contLen < 0) {
41,373,207✔
318
    rpcFreeCont(pHead);
×
319
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
320
    return;
×
321
  }
322
  tFreeSStatusReq(&req);
41,373,207✔
323

324
  SRpcMsg rpcMsg = {.pCont = pHead,
41,373,207✔
325
                    .contLen = contLen,
326
                    .msgType = TDMT_MND_STATUS,
327
                    .info.ahandle = 0,
328
                    .info.notFreeAhandle = 1,
329
                    .info.refId = 0,
330
                    .info.noResp = 0,
331
                    .info.handle = 0};
332
  SRpcMsg rpcRsp = {0};
41,373,207✔
333

334
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
41,373,207✔
335

336
  SEpSet epSet = {0};
41,373,207✔
337
  int8_t epUpdated = 0;
41,373,207✔
338
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
41,373,207✔
339

340
  if (dDebugFlag & DEBUG_TRACE) {
41,373,207✔
341
    char tbuf[512];
1,295,377✔
342
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
1,295,377✔
343
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
1,295,377✔
344
  }
345
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
41,373,207✔
346
  if (code != 0) {
41,373,207✔
347
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
99,527✔
348
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
99,527✔
349
      dmRotateMnodeEpSet(pMgmt->pData);
99,527✔
350
      char tbuf[512];
99,527✔
351
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
99,527✔
352
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
99,527✔
353
            tbuf, epSet.inUse);
354
    }
355
    return;
99,527✔
356
  }
357

358
  if (rpcRsp.code != 0) {
41,273,680✔
359
    dmRotateMnodeEpSet(pMgmt->pData);
550,712✔
360
    char tbuf[512];
550,712✔
361
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
550,712✔
362
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
550,712✔
363
          tbuf, epSet.inUse);
364
  } else {
365
    if (epUpdated == 1) {
40,722,968✔
366
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
348,991✔
367
    }
368
  }
369
  dmProcessStatusRsp(pMgmt, &rpcRsp);
41,273,680✔
370
}
371

372
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
713,359✔
373
  const STraceId *trace = &pRsp->info.traceId;
713,359✔
374
  int32_t         code = 0;
713,359✔
375
  SConfigRsp      configRsp = {0};
713,359✔
376
  bool            needStop = false;
713,359✔
377

378
  if (pRsp->code != 0) {
713,359✔
379
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
380
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
381
      pMgmt->pData->dropped = 1;
×
382
      if (dmWriteEps(pMgmt->pData) != 0) {
×
383
        dError("failed to write dnode file");
×
384
      }
385
      dInfo("dnode will exit since it is in the dropped state");
×
386
      (void)raise(SIGINT);
×
387
    }
388
  } else {
389
    bool needUpdate = false;
713,359✔
390
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,426,718✔
391
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
713,359✔
392
      // Try to use cfg from mnode sdb.
393
      if (!configRsp.isVersionVerified) {
713,359✔
394
        uInfo("config version not verified, update config");
538,130✔
395
        needUpdate = true;
538,130✔
396
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
538,130✔
397
        if (code != TSDB_CODE_SUCCESS) {
538,130✔
398
          dError("failed to persist global config since %s", tstrerror(code));
×
399
          goto _exit;
×
400
        }
401
      }
402
    }
403
    if (needUpdate) {
713,359✔
404
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
538,130✔
405
      if (code != TSDB_CODE_SUCCESS) {
538,130✔
406
        dError("failed to update config since %s", tstrerror(code));
×
407
        goto _exit;
×
408
      }
409
      code = setAllConfigs(tsCfg);
538,130✔
410
      if (code != TSDB_CODE_SUCCESS) {
538,130✔
411
        dError("failed to set all configs since %s", tstrerror(code));
×
412
        goto _exit;
×
413
      }
414
    }
415
    code = taosPersistLocalConfig(pMgmt->path);
713,359✔
416
    if (code != TSDB_CODE_SUCCESS) {
713,359✔
417
      dError("failed to persist local config since %s", tstrerror(code));
×
418
    }
419
    tsConfigInited = 1;
713,359✔
420
  }
421
_exit:
713,359✔
422
  tFreeSConfigRsp(&configRsp);
713,359✔
423
  rpcFreeCont(pRsp->pCont);
713,359✔
424
  if (needStop) {
713,359✔
425
    dmStop();
×
426
  }
427
}
713,359✔
428

429
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
713,975✔
430
  int32_t    code = 0;
713,975✔
431
  SConfigReq req = {0};
713,975✔
432

433
  req.cver = tsdmConfigVersion;
713,975✔
434
  req.forceReadConfig = tsForceReadConfig;
713,975✔
435
  req.array = taosGetGlobalCfg(tsCfg);
713,975✔
436
  dDebug("send config req to mnode, configVersion:%d", req.cver);
713,975✔
437

438
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
713,975✔
439
  if (contLen < 0) {
713,975✔
440
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
441
    return;
×
442
  }
443

444
  void *pHead = rpcMallocCont(contLen);
713,975✔
445
  if (pHead == NULL) {
713,975✔
446
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
447
    return;
×
448
  }
449
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
713,975✔
450
  if (contLen < 0) {
713,975✔
451
    rpcFreeCont(pHead);
×
452
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
453
    return;
×
454
  }
455

456
  SRpcMsg rpcMsg = {.pCont = pHead,
713,975✔
457
                    .contLen = contLen,
458
                    .msgType = TDMT_MND_CONFIG,
459
                    .info.ahandle = 0,
460
                    .info.notFreeAhandle = 1,
461
                    .info.refId = 0,
462
                    .info.noResp = 0,
463
                    .info.handle = 0};
464
  SRpcMsg rpcRsp = {0};
713,975✔
465

466
  SEpSet epSet = {0};
713,975✔
467
  int8_t epUpdated = 0;
713,975✔
468
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
713,975✔
469

470
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
713,975✔
471
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
713,975✔
472
  if (code != 0) {
713,975✔
473
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
616✔
474
    return;
616✔
475
  }
476
  if (rpcRsp.code != 0) {
713,359✔
477
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
478
    return;
×
479
  }
480
  dmProcessConfigRsp(pMgmt, &rpcRsp);
713,359✔
481
}
482

483
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
42,133,410✔
484
  dDebug("begin to get dnode info");
42,133,410✔
485
  SDnodeData dnodeData = {0};
42,133,410✔
486
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
42,133,410✔
487
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
42,133,410✔
488
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
42,133,410✔
489
  dnodeData.clusterId = pMgmt->pData->clusterId;
42,133,410✔
490
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
42,133,410✔
491
  dnodeData.updateTime = pMgmt->pData->updateTime;
42,133,410✔
492
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
42,133,410✔
493
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
42,133,410✔
494

495
  dDebug("begin to get vnode loads");
42,133,410✔
496
  SMonVloadInfo vinfo = {0};
42,133,410✔
497
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
42,133,410✔
498

499
  dDebug("begin to get mnode loads");
42,133,410✔
500
  SMonMloadInfo minfo = {0};
42,133,410✔
501
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
42,133,410✔
502

503
  dDebug("begin to lock status info");
42,133,410✔
504
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
42,133,410✔
505
    dError("failed to lock status info lock");
×
506
    return;
×
507
  }
508
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
42,133,410✔
509
  tsDnodeData.dnodeId = dnodeData.dnodeId;
42,133,410✔
510
  tsDnodeData.clusterId = dnodeData.clusterId;
42,133,410✔
511
  tsDnodeData.rebootTime = dnodeData.rebootTime;
42,133,410✔
512
  tsDnodeData.updateTime = dnodeData.updateTime;
42,133,410✔
513
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
42,133,410✔
514

515
  if (tsVinfo.pVloads == NULL) {
42,133,410✔
516
    tsVinfo.pVloads = vinfo.pVloads;
40,815,173✔
517
    vinfo.pVloads = NULL;
40,815,173✔
518
  } else {
519
    taosArrayDestroy(vinfo.pVloads);
1,318,237✔
520
    vinfo.pVloads = NULL;
1,318,237✔
521
  }
522

523
  tsMLoad = minfo.load;
42,133,410✔
524

525
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
42,133,410✔
526
    dError("failed to unlock status info lock");
×
527
    return;
×
528
  }
529
}
530

531
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
532
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
533
  if (contLen < 0) {
×
534
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
535
    return;
×
536
  }
537
  void *pHead = rpcMallocCont(contLen);
×
538
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
539
  if (contLen < 0) {
×
540
    rpcFreeCont(pHead);
×
541
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
542
    return;
×
543
  }
544

545
  SRpcMsg rpcMsg = {.pCont = pHead,
×
546
                    .contLen = contLen,
547
                    .msgType = TDMT_MND_NOTIFY,
548
                    .info.ahandle = 0,
549
                    .info.notFreeAhandle = 1,
550
                    .info.refId = 0,
551
                    .info.noResp = 1,
552
                    .info.handle = 0};
553

554
  SEpSet epSet = {0};
×
555
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
556
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
557
    dError("failed to send notify req");
×
558
  }
559
}
560

561
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
562
  dError("auth rsp is received, but not supported yet");
×
563
  return 0;
×
564
}
565

566
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
567
  dError("grant rsp is received, but not supported yet");
×
568
  return 0;
×
569
}
570

571
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
86,869✔
572
  int32_t       code = 0;
86,869✔
573
  SDCfgDnodeReq cfgReq = {0};
86,869✔
574
  SConfig      *pCfg = taosGetCfg();
86,869✔
575
  SConfigItem  *pItem = NULL;
86,869✔
576

577
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
86,869✔
578
    return TSDB_CODE_INVALID_MSG;
×
579
  }
580
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
86,869✔
581
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
585✔
582
  }
583

584
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
86,284✔
585

586
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
86,284✔
587
  if (code != 0) {
86,284✔
588
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
499✔
589
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
499✔
590
      return TSDB_CODE_SUCCESS;
499✔
591
    } else {
592
      return code;
×
593
    }
594
  }
595
  if (pItem == NULL) {
85,785✔
596
    return TSDB_CODE_CFG_NOT_FOUND;
×
597
  }
598

599
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
85,785✔
600
    char value[10] = {0};
×
601
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
602
      tsSyncTimeout = 0;
×
603
    }
604

605
    if (tsSyncTimeout > 0) {
×
606
      SConfigItem *pItemTmp = NULL;
×
607
      char         tmp[10] = {0};
×
608

609
      sprintf(tmp, "%d", tsSyncTimeout);
×
610
      TAOS_CHECK_RETURN(
×
611
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
612
      if (pItemTmp == NULL) {
×
613
        return TSDB_CODE_CFG_NOT_FOUND;
×
614
      }
615

616
      sprintf(tmp, "%d", tsSyncTimeout / 4);
×
617
      TAOS_CHECK_RETURN(
×
618
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
619
      if (pItemTmp == NULL) {
×
620
        return TSDB_CODE_CFG_NOT_FOUND;
×
621
      }
622
      TAOS_CHECK_RETURN(
×
623
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
624
      if (pItemTmp == NULL) {
×
625
        return TSDB_CODE_CFG_NOT_FOUND;
×
626
      }
627

628
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
629
      TAOS_CHECK_RETURN(
×
630
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
631
      if (pItemTmp == NULL) {
×
632
        return TSDB_CODE_CFG_NOT_FOUND;
×
633
      }
634
      TAOS_CHECK_RETURN(
×
635
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
636
      if (pItemTmp == NULL) {
×
637
        return TSDB_CODE_CFG_NOT_FOUND;
×
638
      }
639
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
640
      if (pItemTmp == NULL) {
×
641
        return TSDB_CODE_CFG_NOT_FOUND;
×
642
      }
643

644
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
645
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
646
      if (pItemTmp == NULL) {
×
647
        return TSDB_CODE_CFG_NOT_FOUND;
×
648
      }
649

650
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
651
      TAOS_CHECK_RETURN(
×
652
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
653
      if (pItemTmp == NULL) {
×
654
        return TSDB_CODE_CFG_NOT_FOUND;
×
655
      }
656
      TAOS_CHECK_RETURN(
×
657
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
658
      if (pItemTmp == NULL) {
×
659
        return TSDB_CODE_CFG_NOT_FOUND;
×
660
      }
661
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
662
      if (pItemTmp == NULL) {
×
663
        return TSDB_CODE_CFG_NOT_FOUND;
×
664
      }
665

666
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
667
            tsSyncTimeout);
668
    }
669
  }
670

671
  if (!isConifgItemLazyMode(pItem)) {
85,785✔
672
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
84,882✔
673

674
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
84,882✔
675
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
676
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
677
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
678

679
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
680
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
681
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
682
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
683

684
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
685
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
686
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
687

688
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
689
            tsSyncTimeout);
690
    }
691
  }
692

693
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
85,785✔
694
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
15,677✔
695
    if (code != TSDB_CODE_SUCCESS) {
15,677✔
696
      dError("failed to persist global config since %s", tstrerror(code));
×
697
    }
698
  } else {
699
    code = taosPersistLocalConfig(pMgmt->path);
70,108✔
700
    if (code != TSDB_CODE_SUCCESS) {
70,108✔
701
      dError("failed to persist local config since %s", tstrerror(code));
×
702
    }
703
  }
704

705
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
85,785✔
706
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
707

708
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
709
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
710
  }
711

712
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
85,785✔
713
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
85,785✔
714
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
715
  }
716

717
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
85,785✔
718
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
85,785✔
719
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
720
  }
721

722
  if (cfgReq.version > 0) {
85,785✔
723
    tsdmConfigVersion = cfgReq.version;
31,103✔
724
  }
725
  return code;
85,785✔
726
}
727

728
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,031✔
729
#ifdef TD_ENTERPRISE
730
  int32_t       code = 0;
3,031✔
731
  SDCfgDnodeReq cfgReq = {0};
3,031✔
732
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
3,031✔
733
    code = TSDB_CODE_INVALID_MSG;
×
734
    goto _exit;
×
735
  }
736

737
  code = dmUpdateEncryptKey(cfgReq.value, true);
3,031✔
738
  if (code == 0) {
3,031✔
739
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
3,031✔
740
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
3,031✔
741
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
3,031✔
742
  }
743

744
_exit:
3,031✔
745
  pMsg->code = code;
3,031✔
746
  pMsg->info.rsp = NULL;
3,031✔
747
  pMsg->info.rspLen = 0;
3,031✔
748
  return code;
3,031✔
749
#else
750
  return 0;
751
#endif
752
}
753

754
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
755
  int32_t code = 0;
×
756
  int32_t lino = 0;
×
757
  SMsgCb *msgCb = &pMgmt->msgCb;
×
758
  void *pTransCli = msgCb->clientRpc;
×
759
  void *pTransStatus = msgCb->statusRpc;  
×
760
  void *pTransSync = msgCb->syncRpc; 
×
761
  void *pTransServer = msgCb->serverRpc;
×
762

763
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
764
  if (code != 0) {
×
765
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
766
    goto _error;
×
767
  }
768

769
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
770
  if (code != 0) {
×
771
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
772
    goto _error;
×
773
  }
774

775
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
776
  if (code != 0) {
×
777
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
778
    goto _error;
×
779
  }
780

781
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
782
  if (code != 0) {
×
783
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
784
    goto _error;
×
785
  }
786

787
_error:
×
788
  
789
  return code;
×
790
}
791

792
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
322✔
793
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
322✔
794
  pStatus->details[0] = 0;
322✔
795

796
  SMonMloadInfo minfo = {0};
322✔
797
  (*pMgmt->getMnodeLoadsFp)(&minfo);
322✔
798
  if (minfo.isMnode &&
322✔
799
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
322✔
800
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
801
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
802
    return;
×
803
  }
804

805
  SMonVloadInfo vinfo = {0};
322✔
806
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
322✔
807
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
966✔
808
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
644✔
809
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
644✔
810
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
811
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
812
               syncStr(pLoad->syncState));
×
813
      break;
×
814
    }
815
  }
816

817
  taosArrayDestroy(vinfo.pVloads);
322✔
818
}
819

820
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
322✔
821
  int32_t code = 0;
322✔
822
  dDebug("server run status req is received");
322✔
823
  SServerStatusRsp statusRsp = {0};
322✔
824
  dmGetServerRunStatus(pMgmt, &statusRsp);
322✔
825

826
  pMsg->info.rsp = NULL;
322✔
827
  pMsg->info.rspLen = 0;
322✔
828

829
  SRpcMsg rspMsg = {.info = pMsg->info};
322✔
830
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
322✔
831
  if (rspLen < 0) {
322✔
832
    return TSDB_CODE_OUT_OF_MEMORY;
×
833
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
834
    // return rspMsg.code;
835
  }
836

837
  void *pRsp = rpcMallocCont(rspLen);
322✔
838
  if (pRsp == NULL) {
322✔
839
    return terrno;
×
840
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
841
    // return rspMsg.code;
842
  }
843

844
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
322✔
845
  if (rspLen < 0) {
322✔
846
    return TSDB_CODE_INVALID_MSG;
×
847
  }
848

849
  pMsg->info.rsp = pRsp;
322✔
850
  pMsg->info.rspLen = rspLen;
322✔
851
  return 0;
322✔
852
}
853

854
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
74,951✔
855
  int32_t code = 0;
74,951✔
856

857
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
74,951✔
858
  if (pBlock == NULL) {
74,951✔
859
    return terrno;
×
860
  }
861

862
  size_t size = 0;
74,951✔
863

864
  const SSysTableMeta *pMeta = NULL;
74,951✔
865
  getInfosDbMeta(&pMeta, &size);
74,951✔
866

867
  int32_t index = 0;
74,951✔
868
  for (int32_t i = 0; i < size; ++i) {
1,499,020✔
869
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
1,499,020✔
870
      index = i;
74,951✔
871
      break;
74,951✔
872
    }
873
  }
874

875
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
74,951✔
876
  if (pBlock->pDataBlock == NULL) {
74,951✔
877
    code = terrno;
×
878
    goto _exit;
×
879
  }
880

881
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
524,657✔
882
    SColumnInfoData colInfoData = {0};
449,706✔
883
    colInfoData.info.colId = i + 1;
449,706✔
884
    colInfoData.info.type = pMeta[index].schema[i].type;
449,706✔
885
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
449,706✔
886
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
899,412✔
887
      code = terrno;
×
888
      goto _exit;
×
889
    }
890
  }
891

892
  pBlock->info.hasVarCol = true;
74,951✔
893
_exit:
74,951✔
894
  if (code != 0) {
74,951✔
895
    blockDataDestroy(pBlock);
×
896
  } else {
897
    *ppBlock = pBlock;
74,951✔
898
  }
899
  return code;
74,951✔
900
}
901

902
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
74,951✔
903
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
74,951✔
904
  if (code != 0) {
74,951✔
905
    return code;
×
906
  }
907

908
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
74,951✔
909
  if (pColInfo == NULL) {
74,951✔
910
    return TSDB_CODE_OUT_OF_RANGE;
×
911
  }
912

913
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
74,951✔
914
}
915

916
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
74,951✔
917
  int32_t           size = 0;
74,951✔
918
  int32_t           rowsRead = 0;
74,951✔
919
  int32_t           code = 0;
74,951✔
920
  SRetrieveTableReq retrieveReq = {0};
74,951✔
921
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
74,951✔
922
    return TSDB_CODE_INVALID_MSG;
×
923
  }
924
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
74,951✔
925
#if 0
926
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
927
    code = TSDB_CODE_MND_NO_RIGHTS;
928
    return code;
929
  }
930
#endif
931
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
74,951✔
932
    return TSDB_CODE_INVALID_MSG;
×
933
  }
934

935
  SSDataBlock *pBlock = NULL;
74,951✔
936
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
74,951✔
937
    return code;
×
938
  }
939

940
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
74,951✔
941
  if (code != 0) {
74,951✔
942
    blockDataDestroy(pBlock);
×
943
    return code;
×
944
  }
945

946
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
74,951✔
947
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
74,951✔
948
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
74,951✔
949

950
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
74,951✔
951
  if (pRsp == NULL) {
74,951✔
952
    code = terrno;
×
953
    dError("failed to retrieve data since %s", tstrerror(code));
×
954
    blockDataDestroy(pBlock);
×
955
    return code;
×
956
  }
957

958
  char *pStart = pRsp->data;
74,951✔
959
  *(int32_t *)pStart = htonl(numOfCols);
74,951✔
960
  pStart += sizeof(int32_t);  // number of columns
74,951✔
961

962
  for (int32_t i = 0; i < numOfCols; ++i) {
524,657✔
963
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
449,706✔
964
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
449,706✔
965

966
    pSchema->bytes = htonl(pColInfo->info.bytes);
449,706✔
967
    pSchema->colId = htons(pColInfo->info.colId);
449,706✔
968
    pSchema->type = pColInfo->info.type;
449,706✔
969

970
    pStart += sizeof(SSysTableSchema);
449,706✔
971
  }
972

973
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
74,951✔
974
  if (len < 0) {
74,951✔
975
    dError("failed to retrieve data since %s", tstrerror(code));
×
976
    blockDataDestroy(pBlock);
×
977
    rpcFreeCont(pRsp);
×
978
    return terrno;
×
979
  }
980

981
  pRsp->numOfRows = htonl(pBlock->info.rows);
74,951✔
982
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
74,951✔
983
  pRsp->completed = 1;
74,951✔
984
  pMsg->info.rsp = pRsp;
74,951✔
985
  pMsg->info.rspLen = size;
74,951✔
986
  dDebug("dnode variables retrieve completed");
74,951✔
987

988
  blockDataDestroy(pBlock);
74,951✔
989
  return TSDB_CODE_SUCCESS;
74,951✔
990
}
991

992
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,589,974✔
993
  SMStreamHbRspMsg rsp = {0};
14,589,974✔
994
  int32_t          code = 0;
14,589,974✔
995
  SDecoder         decoder;
14,586,177✔
996
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
14,589,974✔
997
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
14,589,974✔
998
  int64_t          currTs = taosGetTimestampMs();
14,589,974✔
999

1000
  if (pMsg->code) {
14,589,974✔
1001
    return streamHbHandleRspErr(pMsg->code, currTs);
201,952✔
1002
  }
1003

1004
  tDecoderInit(&decoder, (uint8_t*)msg, len);
14,388,022✔
1005
  code = tDecodeStreamHbRsp(&decoder, &rsp);
14,388,022✔
1006
  if (code < 0) {
14,388,022✔
1007
    code = TSDB_CODE_INVALID_MSG;
×
1008
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
1009
    tDecoderClear(&decoder);
×
1010
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
1011
    return streamHbHandleRspErr(code, currTs);
×
1012
  }
1013

1014
  tDecoderClear(&decoder);
14,388,022✔
1015

1016
  return streamHbProcessRspMsg(&rsp);
14,388,022✔
1017
}
1018

1019

1020
SArray *dmGetMsgHandles() {
719,525✔
1021
  int32_t code = -1;
719,525✔
1022
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
719,525✔
1023
  if (pArray == NULL) {
719,525✔
1024
    return NULL;
×
1025
  }
1026

1027
  // Requests handled by DNODE
1028
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1029
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1030
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1031
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1032
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1033
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1034
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1035
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1036
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1037
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1038
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1039
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1040
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1041
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1042
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1043
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1044

1045
  // Requests handled by MNODE
1046
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1047
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1048
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,525✔
1049

1050
  code = 0;
719,525✔
1051

1052
_OVER:
719,525✔
1053
  if (code != 0) {
719,525✔
1054
    taosArrayDestroy(pArray);
×
1055
    return NULL;
×
1056
  } else {
1057
    return pArray;
719,525✔
1058
  }
1059
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc