• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4897

25 Dec 2025 10:17AM UTC coverage: 65.717% (-0.2%) from 65.929%
#4897

push

travis-ci

web-flow
fix: [6622889291] Fix invalid rowSize. (#34043)

186011 of 283047 relevant lines covered (65.72%)

113853896.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.37
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "monitor.h"
21
#include "stream.h"
22
#include "systable.h"
23
#include "tanalytics.h"
24
#include "tchecksum.h"
25
#include "tutil.h"
26

27
extern SConfig *tsCfg;
28
extern void setAuditDbNameToken(char *pDb, char *pToken);
29

30
#ifndef TD_ENTERPRISE
31
void setAuditDbNameToken(char *pDb, char *pToken) {}
32
#endif
33

34
extern void getAuditDbNameToken(char *pDb, char *pToken);
35

36
#ifndef TD_ENTERPRISE
37
void getAuditDbNameToken(char *pDb, char *pToken) {}
38
#endif
39

40
SMonVloadInfo tsVinfo = {0};
41
SMnodeLoad    tsMLoad = {0};
42
SDnodeData    tsDnodeData = {0};
43

44
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
2,441,745✔
45
  int32_t code = 0;
2,441,745✔
46
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
2,441,745✔
47
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
526,409✔
48
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
526,409✔
49
    pMgmt->pData->dnodeId = pCfg->dnodeId;
526,409✔
50
    pMgmt->pData->clusterId = pCfg->clusterId;
526,409✔
51
    monSetDnodeId(pCfg->dnodeId);
526,409✔
52
    auditSetDnodeId(pCfg->dnodeId);
526,409✔
53
    code = dmWriteEps(pMgmt->pData);
526,409✔
54
    if (code != 0) {
526,409✔
55
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
1,005✔
56
            tstrerror(code));
57
    }
58
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
526,409✔
59
  }
60
}
2,441,745✔
61

62
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,654,236✔
63
  int32_t code = 0;
2,654,236✔
64
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,654,236✔
65
  if (pMgmt->pData->ipWhiteVer == ver) {
2,654,236✔
66
    if (ver == 0) {
2,652,458✔
67
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,649,693✔
68
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,649,693✔
69
        dError("failed to disable ip white list on dnode");
×
70
      }
71
    }
72
    return;
2,652,458✔
73
  }
74
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
1,778✔
75

76
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,778✔
77
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,778✔
78
  if (contLen < 0) {
1,778✔
79
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
80
    return;
×
81
  }
82
  void *pHead = rpcMallocCont(contLen);
1,778✔
83
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,778✔
84
  if (contLen < 0) {
1,778✔
85
    rpcFreeCont(pHead);
×
86
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
87
    return;
×
88
  }
89

90
  SRpcMsg rpcMsg = {.pCont = pHead,
1,778✔
91
                    .contLen = contLen,
92
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
93
                    .info.ahandle = 0,
94
                    .info.notFreeAhandle = 1,
95
                    .info.refId = 0,
96
                    .info.noResp = 0,
97
                    .info.handle = 0};
98
  SEpSet  epset = {0};
1,778✔
99

100
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,778✔
101

102
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,778✔
103
  if (code != 0) {
1,778✔
104
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
105
  }
106
}
107

108

109

110
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,654,236✔
111
  int32_t code = 0;
2,654,236✔
112
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,654,236✔
113
  if (pMgmt->pData->timeWhiteVer == ver) {
2,654,236✔
114
    if (ver == 0) {
2,652,458✔
115
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,649,693✔
116
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,649,693✔
117
        dError("failed to disable time white list on dnode");
×
118
      }
119
    }
120
    return;
2,652,458✔
121
  }
122
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
1,778✔
123

124
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,778✔
125
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,778✔
126
  if (contLen < 0) {
1,778✔
127
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
128
    return;
×
129
  }
130
  void *pHead = rpcMallocCont(contLen);
1,778✔
131
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,778✔
132
  if (contLen < 0) {
1,778✔
133
    rpcFreeCont(pHead);
×
134
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
135
    return;
×
136
  }
137

138
  SRpcMsg rpcMsg = {.pCont = pHead,
1,778✔
139
                    .contLen = contLen,
140
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
141
                    .info.ahandle = 0,
142
                    .info.notFreeAhandle = 1,
143
                    .info.refId = 0,
144
                    .info.noResp = 0,
145
                    .info.handle = 0};
146
  SEpSet  epset = {0};
1,778✔
147

148
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,778✔
149

150
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,778✔
151
  if (code != 0) {
1,778✔
152
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
153
  }
154
}
155

156

157

158
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,654,236✔
159
  int32_t code = 0;
2,654,236✔
160
  int64_t oldVer = taosAnalyGetVersion();
2,654,236✔
161
  if (oldVer == newVer) return;
2,654,236✔
162
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
163

164
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
165
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
166
  if (contLen < 0) {
×
167
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
168
    return;
×
169
  }
170

171
  void *pHead = rpcMallocCont(contLen);
×
172
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
173
  if (contLen < 0) {
×
174
    rpcFreeCont(pHead);
×
175
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
176
    return;
×
177
  }
178

179
  SRpcMsg rpcMsg = {
×
180
      .pCont = pHead,
181
      .contLen = contLen,
182
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
183
      .info.ahandle = 0,
184
      .info.refId = 0,
185
      .info.noResp = 0,
186
      .info.handle = 0,
187
  };
188
  SEpSet epset = {0};
×
189

190
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
191

192
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
193
  if (code != 0) {
×
194
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
195
  }
196
}
197

198
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
41,627,094✔
199
  const STraceId *trace = &pRsp->info.traceId;
41,627,094✔
200
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
41,627,094✔
201

202
  if (pRsp->code != 0) {
41,627,094✔
203
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
530,113✔
204
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
345✔
205
             pMgmt->statusSeq);
206
      pMgmt->pData->dropped = 1;
345✔
207
      if (dmWriteEps(pMgmt->pData) != 0) {
345✔
208
        dError("failed to write dnode file");
×
209
      }
210
      dInfo("dnode will exit since it is in the dropped state");
345✔
211
      (void)raise(SIGINT);
345✔
212
    }
213
  } else {
214
    SStatusRsp statusRsp = {0};
41,096,981✔
215
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
43,751,217✔
216
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,654,236✔
217
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,654,236✔
218
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
2,441,745✔
219
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
220
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
2,441,745✔
221
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
2,441,745✔
222
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
2,441,745✔
223
      }
224
      dGInfo("dnode:%d, set auditDB:%s, token:%s in status rsp received from mnode", pMgmt->pData->dnodeId,
2,654,236✔
225
             statusRsp.auditDB, statusRsp.auditToken);
226
      setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken);
2,654,236✔
227
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,654,236✔
228
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,654,236✔
229
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,654,236✔
230
    }
231
    tFreeSStatusRsp(&statusRsp);
41,096,981✔
232
  }
233
  rpcFreeCont(pRsp->pCont);
41,627,094✔
234
}
41,627,094✔
235

236
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
41,724,913✔
237
  int32_t    code = 0;
41,724,913✔
238
  SStatusReq req = {0};
41,724,913✔
239
  req.timestamp = taosGetTimestampMs();
41,724,913✔
240
  pMgmt->statusSeq++;
41,724,913✔
241

242
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
41,724,913✔
243
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
41,724,913✔
244
    dError("failed to lock status info lock");
×
245
    return;
×
246
  }
247

248
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
41,724,913✔
249
  req.sver = tsVersion;
41,724,913✔
250
  req.dnodeVer = tsDnodeData.dnodeVer;
41,724,913✔
251
  req.dnodeId = tsDnodeData.dnodeId;
41,724,913✔
252
  req.clusterId = tsDnodeData.clusterId;
41,724,913✔
253
  if (req.clusterId == 0) req.dnodeId = 0;
41,724,913✔
254
  req.rebootTime = tsDnodeData.rebootTime;
41,724,913✔
255
  req.updateTime = tsDnodeData.updateTime;
41,724,913✔
256
  req.numOfCores = tsNumOfCores;
41,724,913✔
257
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
41,724,913✔
258
  req.numOfDiskCfg = tsDiskCfgNum;
41,724,913✔
259
  req.memTotal = tsTotalMemoryKB * 1024;
41,724,913✔
260
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
41,724,913✔
261
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
41,724,913✔
262
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
41,724,913✔
263

264
  req.clusterCfg.statusInterval = tsStatusInterval;
41,724,913✔
265
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
41,724,913✔
266
  req.clusterCfg.checkTime = 0;
41,724,913✔
267
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
41,724,913✔
268
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
41,724,913✔
269
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
41,724,913✔
270
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
41,724,913✔
271
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
41,724,913✔
272
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
41,724,913✔
273
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
41,724,913✔
274
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
41,724,913✔
275
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
41,724,913✔
276
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
41,724,913✔
277
  char timestr[32] = "1970-01-01 00:00:00.00";
41,724,913✔
278
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
41,724,913✔
279
      0) {
280
    dError("failed to parse time since %s", tstrerror(code));
×
281
  }
282
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
41,724,913✔
283
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
41,724,913✔
284
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
41,724,913✔
285

286
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
41,724,913✔
287

288
  req.pVloads = tsVinfo.pVloads;
41,724,913✔
289
  tsVinfo.pVloads = NULL;
41,724,913✔
290

291
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
41,724,913✔
292
  req.mload = tsMLoad;
41,724,913✔
293

294
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
41,724,913✔
295
    dError("failed to unlock status info lock");
×
296
    return;
×
297
  }
298

299
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
41,724,913✔
300
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
41,724,913✔
301

302
  req.statusSeq = pMgmt->statusSeq;
41,724,913✔
303
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
41,724,913✔
304
  req.analVer = taosAnalyGetVersion();
41,724,913✔
305
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
41,724,913✔
306

307
  if (tsAuditUseToken) {
41,724,913✔
308
    getAuditDbNameToken(req.auditDB, req.auditToken);
41,724,913✔
309
  }
310

311
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
41,724,913✔
312
  if (contLen < 0) {
41,724,913✔
313
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
314
    return;
×
315
  }
316

317
  void *pHead = rpcMallocCont(contLen);
41,724,913✔
318
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
41,724,913✔
319
  if (contLen < 0) {
41,724,913✔
320
    rpcFreeCont(pHead);
×
321
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
322
    return;
×
323
  }
324
  tFreeSStatusReq(&req);
41,724,913✔
325

326
  SRpcMsg rpcMsg = {.pCont = pHead,
41,724,913✔
327
                    .contLen = contLen,
328
                    .msgType = TDMT_MND_STATUS,
329
                    .info.ahandle = 0,
330
                    .info.notFreeAhandle = 1,
331
                    .info.refId = 0,
332
                    .info.noResp = 0,
333
                    .info.handle = 0};
334
  SRpcMsg rpcRsp = {0};
41,724,913✔
335

336
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
41,724,913✔
337

338
  SEpSet epSet = {0};
41,724,913✔
339
  int8_t epUpdated = 0;
41,724,913✔
340
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
41,724,913✔
341

342
  if (dDebugFlag & DEBUG_TRACE) {
41,724,913✔
343
    char tbuf[512];
1,131,535✔
344
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
1,131,535✔
345
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
1,131,535✔
346
  }
347
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
41,724,913✔
348
  if (code != 0) {
41,724,913✔
349
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
97,819✔
350
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
97,819✔
351
      dmRotateMnodeEpSet(pMgmt->pData);
97,819✔
352
      char tbuf[512];
97,819✔
353
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
97,819✔
354
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
97,819✔
355
            tbuf, epSet.inUse);
356
    }
357
    return;
97,819✔
358
  }
359

360
  if (rpcRsp.code != 0) {
41,627,094✔
361
    dmRotateMnodeEpSet(pMgmt->pData);
530,113✔
362
    char tbuf[512];
530,113✔
363
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
530,113✔
364
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
530,113✔
365
          tbuf, epSet.inUse);
366
  } else {
367
    if (epUpdated == 1) {
41,096,981✔
368
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
320,689✔
369
    }
370
  }
371
  dmProcessStatusRsp(pMgmt, &rpcRsp);
41,627,094✔
372
}
373

374
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
701,579✔
375
  const STraceId *trace = &pRsp->info.traceId;
701,579✔
376
  int32_t         code = 0;
701,579✔
377
  SConfigRsp      configRsp = {0};
701,579✔
378
  bool            needStop = false;
701,579✔
379

380
  if (pRsp->code != 0) {
701,579✔
381
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
382
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
383
      pMgmt->pData->dropped = 1;
×
384
      if (dmWriteEps(pMgmt->pData) != 0) {
×
385
        dError("failed to write dnode file");
×
386
      }
387
      dInfo("dnode will exit since it is in the dropped state");
×
388
      (void)raise(SIGINT);
×
389
    }
390
  } else {
391
    bool needUpdate = false;
701,579✔
392
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,403,158✔
393
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
701,579✔
394
      // Try to use cfg from mnode sdb.
395
      if (!configRsp.isVersionVerified) {
701,579✔
396
        uInfo("config version not verified, update config");
528,488✔
397
        needUpdate = true;
528,488✔
398
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
528,488✔
399
        if (code != TSDB_CODE_SUCCESS) {
528,488✔
400
          dError("failed to persist global config since %s", tstrerror(code));
×
401
          goto _exit;
×
402
        }
403
      }
404
    }
405
    if (needUpdate) {
701,579✔
406
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
528,488✔
407
      if (code != TSDB_CODE_SUCCESS) {
528,488✔
408
        dError("failed to update config since %s", tstrerror(code));
×
409
        goto _exit;
×
410
      }
411
      code = setAllConfigs(tsCfg);
528,488✔
412
      if (code != TSDB_CODE_SUCCESS) {
528,488✔
413
        dError("failed to set all configs since %s", tstrerror(code));
×
414
        goto _exit;
×
415
      }
416
    }
417
    code = taosPersistLocalConfig(pMgmt->path);
701,579✔
418
    if (code != TSDB_CODE_SUCCESS) {
701,579✔
419
      dError("failed to persist local config since %s", tstrerror(code));
×
420
    }
421
    tsConfigInited = 1;
701,579✔
422
  }
423
_exit:
701,579✔
424
  tFreeSConfigRsp(&configRsp);
701,579✔
425
  rpcFreeCont(pRsp->pCont);
701,579✔
426
  if (needStop) {
701,579✔
427
    dmStop();
×
428
  }
429
}
701,579✔
430

431
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
702,173✔
432
  int32_t    code = 0;
702,173✔
433
  SConfigReq req = {0};
702,173✔
434

435
  req.cver = tsdmConfigVersion;
702,173✔
436
  req.forceReadConfig = tsForceReadConfig;
702,173✔
437
  req.array = taosGetGlobalCfg(tsCfg);
702,173✔
438
  dDebug("send config req to mnode, configVersion:%d", req.cver);
702,173✔
439

440
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
702,173✔
441
  if (contLen < 0) {
702,173✔
442
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
443
    return;
×
444
  }
445

446
  void *pHead = rpcMallocCont(contLen);
702,173✔
447
  if (pHead == NULL) {
702,173✔
448
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
449
    return;
×
450
  }
451
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
702,173✔
452
  if (contLen < 0) {
702,173✔
453
    rpcFreeCont(pHead);
×
454
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
455
    return;
×
456
  }
457

458
  SRpcMsg rpcMsg = {.pCont = pHead,
702,173✔
459
                    .contLen = contLen,
460
                    .msgType = TDMT_MND_CONFIG,
461
                    .info.ahandle = 0,
462
                    .info.notFreeAhandle = 1,
463
                    .info.refId = 0,
464
                    .info.noResp = 0,
465
                    .info.handle = 0};
466
  SRpcMsg rpcRsp = {0};
702,173✔
467

468
  SEpSet epSet = {0};
702,173✔
469
  int8_t epUpdated = 0;
702,173✔
470
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
702,173✔
471

472
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
702,173✔
473
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
702,173✔
474
  if (code != 0) {
702,173✔
475
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
594✔
476
    return;
594✔
477
  }
478
  if (rpcRsp.code != 0) {
701,579✔
479
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
480
    return;
×
481
  }
482
  dmProcessConfigRsp(pMgmt, &rpcRsp);
701,579✔
483
}
484

485
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
42,462,276✔
486
  dDebug("begin to get dnode info");
42,462,276✔
487
  SDnodeData dnodeData = {0};
42,462,276✔
488
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
42,462,276✔
489
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
42,462,276✔
490
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
42,462,276✔
491
  dnodeData.clusterId = pMgmt->pData->clusterId;
42,462,276✔
492
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
42,462,276✔
493
  dnodeData.updateTime = pMgmt->pData->updateTime;
42,462,276✔
494
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
42,462,276✔
495
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
42,462,276✔
496

497
  dDebug("begin to get vnode loads");
42,462,276✔
498
  SMonVloadInfo vinfo = {0};
42,462,276✔
499
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
42,462,276✔
500

501
  dDebug("begin to get mnode loads");
42,462,276✔
502
  SMonMloadInfo minfo = {0};
42,462,276✔
503
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
42,462,276✔
504

505
  dDebug("begin to lock status info");
42,462,276✔
506
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
42,462,276✔
507
    dError("failed to lock status info lock");
×
508
    return;
×
509
  }
510
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
42,462,276✔
511
  tsDnodeData.dnodeId = dnodeData.dnodeId;
42,462,276✔
512
  tsDnodeData.clusterId = dnodeData.clusterId;
42,462,276✔
513
  tsDnodeData.rebootTime = dnodeData.rebootTime;
42,462,276✔
514
  tsDnodeData.updateTime = dnodeData.updateTime;
42,462,276✔
515
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
42,462,276✔
516

517
  if (tsVinfo.pVloads == NULL) {
42,462,276✔
518
    tsVinfo.pVloads = vinfo.pVloads;
41,237,784✔
519
    vinfo.pVloads = NULL;
41,237,784✔
520
  } else {
521
    taosArrayDestroy(vinfo.pVloads);
1,224,492✔
522
    vinfo.pVloads = NULL;
1,224,492✔
523
  }
524

525
  tsMLoad = minfo.load;
42,462,276✔
526

527
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
42,462,276✔
528
    dError("failed to unlock status info lock");
×
529
    return;
×
530
  }
531
}
532

533
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
534
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
535
  if (contLen < 0) {
×
536
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
537
    return;
×
538
  }
539
  void *pHead = rpcMallocCont(contLen);
×
540
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
541
  if (contLen < 0) {
×
542
    rpcFreeCont(pHead);
×
543
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
544
    return;
×
545
  }
546

547
  SRpcMsg rpcMsg = {.pCont = pHead,
×
548
                    .contLen = contLen,
549
                    .msgType = TDMT_MND_NOTIFY,
550
                    .info.ahandle = 0,
551
                    .info.notFreeAhandle = 1,
552
                    .info.refId = 0,
553
                    .info.noResp = 1,
554
                    .info.handle = 0};
555

556
  SEpSet epSet = {0};
×
557
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
558
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
559
    dError("failed to send notify req");
×
560
  }
561
}
562

563
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
564
  dError("auth rsp is received, but not supported yet");
×
565
  return 0;
×
566
}
567

568
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
569
  dError("grant rsp is received, but not supported yet");
×
570
  return 0;
×
571
}
572

573
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
86,186✔
574
  int32_t       code = 0;
86,186✔
575
  SDCfgDnodeReq cfgReq = {0};
86,186✔
576
  SConfig      *pCfg = taosGetCfg();
86,186✔
577
  SConfigItem  *pItem = NULL;
86,186✔
578

579
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
86,186✔
580
    return TSDB_CODE_INVALID_MSG;
×
581
  }
582
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
86,186✔
583
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
520✔
584
  }
585

586
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
85,666✔
587

588
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
85,666✔
589
  if (code != 0) {
85,666✔
590
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
468✔
591
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
468✔
592
      return TSDB_CODE_SUCCESS;
468✔
593
    } else {
594
      return code;
×
595
    }
596
  }
597
  if (pItem == NULL) {
85,198✔
598
    return TSDB_CODE_CFG_NOT_FOUND;
×
599
  }
600

601
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
85,198✔
602
    char value[10] = {0};
×
603
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
604
      tsSyncTimeout = 0;
×
605
    }
606

607
    if (tsSyncTimeout > 0) {
×
608
      SConfigItem *pItemTmp = NULL;
×
609
      char         tmp[10] = {0};
×
610

611
      sprintf(tmp, "%d", tsSyncTimeout);
×
612
      TAOS_CHECK_RETURN(
×
613
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
614
      if (pItemTmp == NULL) {
×
615
        return TSDB_CODE_CFG_NOT_FOUND;
×
616
      }
617

618
      sprintf(tmp, "%d", tsSyncTimeout / 4);
×
619
      TAOS_CHECK_RETURN(
×
620
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
621
      if (pItemTmp == NULL) {
×
622
        return TSDB_CODE_CFG_NOT_FOUND;
×
623
      }
624
      TAOS_CHECK_RETURN(
×
625
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
626
      if (pItemTmp == NULL) {
×
627
        return TSDB_CODE_CFG_NOT_FOUND;
×
628
      }
629

630
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
631
      TAOS_CHECK_RETURN(
×
632
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
633
      if (pItemTmp == NULL) {
×
634
        return TSDB_CODE_CFG_NOT_FOUND;
×
635
      }
636
      TAOS_CHECK_RETURN(
×
637
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
638
      if (pItemTmp == NULL) {
×
639
        return TSDB_CODE_CFG_NOT_FOUND;
×
640
      }
641
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
642
      if (pItemTmp == NULL) {
×
643
        return TSDB_CODE_CFG_NOT_FOUND;
×
644
      }
645

646
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
647
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
648
      if (pItemTmp == NULL) {
×
649
        return TSDB_CODE_CFG_NOT_FOUND;
×
650
      }
651

652
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
653
      TAOS_CHECK_RETURN(
×
654
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
655
      if (pItemTmp == NULL) {
×
656
        return TSDB_CODE_CFG_NOT_FOUND;
×
657
      }
658
      TAOS_CHECK_RETURN(
×
659
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
660
      if (pItemTmp == NULL) {
×
661
        return TSDB_CODE_CFG_NOT_FOUND;
×
662
      }
663
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
664
      if (pItemTmp == NULL) {
×
665
        return TSDB_CODE_CFG_NOT_FOUND;
×
666
      }
667

668
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
669
            tsSyncTimeout);
670
    }
671
  }
672

673
  if (!isConifgItemLazyMode(pItem)) {
85,198✔
674
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
84,304✔
675

676
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
84,304✔
677
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
678
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
679
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
680

681
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
682
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
683
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
684
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
685

686
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
687
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
688
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
689

690
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
691
            tsSyncTimeout);
692
    }
693
  }
694

695
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
85,198✔
696
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
14,912✔
697
    if (code != TSDB_CODE_SUCCESS) {
14,912✔
698
      dError("failed to persist global config since %s", tstrerror(code));
×
699
    }
700
  } else {
701
    code = taosPersistLocalConfig(pMgmt->path);
70,286✔
702
    if (code != TSDB_CODE_SUCCESS) {
70,286✔
703
      dError("failed to persist local config since %s", tstrerror(code));
×
704
    }
705
  }
706

707
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
85,198✔
708
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
709

710
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
711
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
712
  }
713

714
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
85,198✔
715
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
85,198✔
716
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
717
  }
718

719
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
85,198✔
720
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
85,198✔
721
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
722
  }
723

724
  if (cfgReq.version > 0) {
85,198✔
725
    tsdmConfigVersion = cfgReq.version;
29,602✔
726
  }
727
  return code;
85,198✔
728
}
729

730
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,034✔
731
#ifdef TD_ENTERPRISE
732
  int32_t       code = 0;
3,034✔
733
  SDCfgDnodeReq cfgReq = {0};
3,034✔
734
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
3,034✔
735
    code = TSDB_CODE_INVALID_MSG;
×
736
    goto _exit;
×
737
  }
738

739
  code = dmUpdateEncryptKey(cfgReq.value, true);
3,034✔
740
  if (code == 0) {
3,034✔
741
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
3,034✔
742
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
3,034✔
743
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
3,034✔
744
  }
745

746
_exit:
3,034✔
747
  pMsg->code = code;
3,034✔
748
  pMsg->info.rsp = NULL;
3,034✔
749
  pMsg->info.rspLen = 0;
3,034✔
750
  return code;
3,034✔
751
#else
752
  return 0;
753
#endif
754
}
755

756
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
757
  int32_t code = 0;
×
758
  int32_t lino = 0;
×
759
  SMsgCb *msgCb = &pMgmt->msgCb;
×
760
  void *pTransCli = msgCb->clientRpc;
×
761
  void *pTransStatus = msgCb->statusRpc;  
×
762
  void *pTransSync = msgCb->syncRpc; 
×
763
  void *pTransServer = msgCb->serverRpc;
×
764

765
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
766
  if (code != 0) {
×
767
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
768
    goto _error;
×
769
  }
770

771
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
772
  if (code != 0) {
×
773
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
774
    goto _error;
×
775
  }
776

777
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
778
  if (code != 0) {
×
779
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
780
    goto _error;
×
781
  }
782

783
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
784
  if (code != 0) {
×
785
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
786
    goto _error;
×
787
  }
788

789
_error:
×
790
  
791
  return code;
×
792
}
793

794
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
297✔
795
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
297✔
796
  pStatus->details[0] = 0;
297✔
797

798
  SMonMloadInfo minfo = {0};
297✔
799
  (*pMgmt->getMnodeLoadsFp)(&minfo);
297✔
800
  if (minfo.isMnode &&
297✔
801
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
297✔
802
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
803
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
804
    return;
×
805
  }
806

807
  SMonVloadInfo vinfo = {0};
297✔
808
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
297✔
809
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
891✔
810
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
594✔
811
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
594✔
812
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
813
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
814
               syncStr(pLoad->syncState));
×
815
      break;
×
816
    }
817
  }
818

819
  taosArrayDestroy(vinfo.pVloads);
297✔
820
}
821

822
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
297✔
823
  int32_t code = 0;
297✔
824
  dDebug("server run status req is received");
297✔
825
  SServerStatusRsp statusRsp = {0};
297✔
826
  dmGetServerRunStatus(pMgmt, &statusRsp);
297✔
827

828
  pMsg->info.rsp = NULL;
297✔
829
  pMsg->info.rspLen = 0;
297✔
830

831
  SRpcMsg rspMsg = {.info = pMsg->info};
297✔
832
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
297✔
833
  if (rspLen < 0) {
297✔
834
    return TSDB_CODE_OUT_OF_MEMORY;
×
835
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
836
    // return rspMsg.code;
837
  }
838

839
  void *pRsp = rpcMallocCont(rspLen);
297✔
840
  if (pRsp == NULL) {
297✔
841
    return terrno;
×
842
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
843
    // return rspMsg.code;
844
  }
845

846
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
297✔
847
  if (rspLen < 0) {
297✔
848
    return TSDB_CODE_INVALID_MSG;
×
849
  }
850

851
  pMsg->info.rsp = pRsp;
297✔
852
  pMsg->info.rspLen = rspLen;
297✔
853
  return 0;
297✔
854
}
855

856
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
70,988✔
857
  int32_t code = 0;
70,988✔
858

859
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
70,988✔
860
  if (pBlock == NULL) {
70,988✔
861
    return terrno;
×
862
  }
863

864
  size_t size = 0;
70,988✔
865

866
  const SSysTableMeta *pMeta = NULL;
70,988✔
867
  getInfosDbMeta(&pMeta, &size);
70,988✔
868

869
  int32_t index = 0;
70,988✔
870
  for (int32_t i = 0; i < size; ++i) {
1,419,760✔
871
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
1,419,760✔
872
      index = i;
70,988✔
873
      break;
70,988✔
874
    }
875
  }
876

877
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
70,988✔
878
  if (pBlock->pDataBlock == NULL) {
70,988✔
879
    code = terrno;
×
880
    goto _exit;
×
881
  }
882

883
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
496,916✔
884
    SColumnInfoData colInfoData = {0};
425,928✔
885
    colInfoData.info.colId = i + 1;
425,928✔
886
    colInfoData.info.type = pMeta[index].schema[i].type;
425,928✔
887
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
425,928✔
888
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
851,856✔
889
      code = terrno;
×
890
      goto _exit;
×
891
    }
892
  }
893

894
  pBlock->info.hasVarCol = true;
70,988✔
895
_exit:
70,988✔
896
  if (code != 0) {
70,988✔
897
    blockDataDestroy(pBlock);
×
898
  } else {
899
    *ppBlock = pBlock;
70,988✔
900
  }
901
  return code;
70,988✔
902
}
903

904
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
70,988✔
905
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
70,988✔
906
  if (code != 0) {
70,988✔
907
    return code;
×
908
  }
909

910
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
70,988✔
911
  if (pColInfo == NULL) {
70,988✔
912
    return TSDB_CODE_OUT_OF_RANGE;
×
913
  }
914

915
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
70,988✔
916
}
917

918
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
70,988✔
919
  int32_t           size = 0;
70,988✔
920
  int32_t           rowsRead = 0;
70,988✔
921
  int32_t           code = 0;
70,988✔
922
  SRetrieveTableReq retrieveReq = {0};
70,988✔
923
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
70,988✔
924
    return TSDB_CODE_INVALID_MSG;
×
925
  }
926
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
70,988✔
927
#if 0
928
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
929
    code = TSDB_CODE_MND_NO_RIGHTS;
930
    return code;
931
  }
932
#endif
933
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
70,988✔
934
    return TSDB_CODE_INVALID_MSG;
×
935
  }
936

937
  SSDataBlock *pBlock = NULL;
70,988✔
938
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
70,988✔
939
    return code;
×
940
  }
941

942
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
70,988✔
943
  if (code != 0) {
70,988✔
944
    blockDataDestroy(pBlock);
×
945
    return code;
×
946
  }
947

948
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
70,988✔
949
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
70,988✔
950
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
70,988✔
951

952
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
70,988✔
953
  if (pRsp == NULL) {
70,988✔
954
    code = terrno;
×
955
    dError("failed to retrieve data since %s", tstrerror(code));
×
956
    blockDataDestroy(pBlock);
×
957
    return code;
×
958
  }
959

960
  char *pStart = pRsp->data;
70,988✔
961
  *(int32_t *)pStart = htonl(numOfCols);
70,988✔
962
  pStart += sizeof(int32_t);  // number of columns
70,988✔
963

964
  for (int32_t i = 0; i < numOfCols; ++i) {
496,916✔
965
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
425,928✔
966
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
425,928✔
967

968
    pSchema->bytes = htonl(pColInfo->info.bytes);
425,928✔
969
    pSchema->colId = htons(pColInfo->info.colId);
425,928✔
970
    pSchema->type = pColInfo->info.type;
425,928✔
971

972
    pStart += sizeof(SSysTableSchema);
425,928✔
973
  }
974

975
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
70,988✔
976
  if (len < 0) {
70,988✔
977
    dError("failed to retrieve data since %s", tstrerror(code));
×
978
    blockDataDestroy(pBlock);
×
979
    rpcFreeCont(pRsp);
×
980
    return terrno;
×
981
  }
982

983
  pRsp->numOfRows = htonl(pBlock->info.rows);
70,988✔
984
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
70,988✔
985
  pRsp->completed = 1;
70,988✔
986
  pMsg->info.rsp = pRsp;
70,988✔
987
  pMsg->info.rspLen = size;
70,988✔
988
  dDebug("dnode variables retrieve completed");
70,988✔
989

990
  blockDataDestroy(pBlock);
70,988✔
991
  return TSDB_CODE_SUCCESS;
70,988✔
992
}
993

994
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,769,412✔
995
  SMStreamHbRspMsg rsp = {0};
14,769,412✔
996
  int32_t          code = 0;
14,769,412✔
997
  SDecoder         decoder;
14,765,225✔
998
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
14,769,412✔
999
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
14,769,412✔
1000
  int64_t          currTs = taosGetTimestampMs();
14,769,412✔
1001

1002
  if (pMsg->code) {
14,769,412✔
1003
    return streamHbHandleRspErr(pMsg->code, currTs);
190,378✔
1004
  }
1005

1006
  tDecoderInit(&decoder, (uint8_t*)msg, len);
14,579,034✔
1007
  code = tDecodeStreamHbRsp(&decoder, &rsp);
14,579,034✔
1008
  if (code < 0) {
14,579,034✔
1009
    code = TSDB_CODE_INVALID_MSG;
×
1010
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
1011
    tDecoderClear(&decoder);
×
1012
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
1013
    return streamHbHandleRspErr(code, currTs);
×
1014
  }
1015

1016
  tDecoderClear(&decoder);
14,579,034✔
1017

1018
  return streamHbProcessRspMsg(&rsp);
14,579,034✔
1019
}
1020

1021

1022
SArray *dmGetMsgHandles() {
706,425✔
1023
  int32_t code = -1;
706,425✔
1024
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
706,425✔
1025
  if (pArray == NULL) {
706,425✔
1026
    return NULL;
×
1027
  }
1028

1029
  // Requests handled by DNODE
1030
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1031
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1032
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1033
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1034
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1035
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1036
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1037
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1038
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1039
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1040
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1041
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1042
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1043
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1044
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1045
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1046

1047
  // Requests handled by MNODE
1048
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1049
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1050
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
706,425✔
1051

1052
  code = 0;
706,425✔
1053

1054
_OVER:
706,425✔
1055
  if (code != 0) {
706,425✔
1056
    taosArrayDestroy(pArray);
×
1057
    return NULL;
×
1058
  } else {
1059
    return pArray;
706,425✔
1060
  }
1061
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc