• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4876

10 Dec 2025 05:56AM UTC coverage: 64.632% (+0.2%) from 64.472%
#4876

push

travis-ci

guanshengliang
test: fix idmp case with checkDataMemLoop checked (#33862)

4 of 9 new or added lines in 3 files covered. (44.44%)

380 existing lines in 104 files now uncovered.

162866 of 251990 relevant lines covered (64.63%)

107950382.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.09
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
#include "monitor.h"
20
#include "systable.h"
21
#include "tanalytics.h"
22
#include "tchecksum.h"
23
#include "tutil.h"
24
#include "stream.h"
25

26
extern SConfig *tsCfg;
27

28
SMonVloadInfo tsVinfo = {0};
29
SMnodeLoad    tsMLoad = {0};
30
SDnodeData    tsDnodeData = {0};
31

32
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
2,498,878✔
33
  int32_t code = 0;
2,498,878✔
34
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
2,498,878✔
35
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
501,550✔
36
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
501,550✔
37
    pMgmt->pData->dnodeId = pCfg->dnodeId;
501,550✔
38
    pMgmt->pData->clusterId = pCfg->clusterId;
501,550✔
39
    monSetDnodeId(pCfg->dnodeId);
501,550✔
40
    auditSetDnodeId(pCfg->dnodeId);
501,550✔
41
    code = dmWriteEps(pMgmt->pData);
501,550✔
42
    if (code != 0) {
501,550✔
UNCOV
43
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
×
44
            tstrerror(code));
45
    }
46
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
501,550✔
47
  }
48
}
2,498,878✔
49

50
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,718,652✔
51
  int32_t code = 0;
2,718,652✔
52
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,718,652✔
53
  if (pMgmt->pData->ipWhiteVer == ver) {
2,718,652✔
54
    if (ver == 0) {
2,716,890✔
55
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,714,152✔
56
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,714,152✔
57
        dError("failed to disable ip white list on dnode");
×
58
      }
59
    }
60
    return;
2,716,890✔
61
  }
62
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
1,762✔
63

64
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,762✔
65
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,762✔
66
  if (contLen < 0) {
1,762✔
67
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
68
    return;
×
69
  }
70
  void *pHead = rpcMallocCont(contLen);
1,762✔
71
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,762✔
72
  if (contLen < 0) {
1,762✔
73
    rpcFreeCont(pHead);
×
74
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
75
    return;
×
76
  }
77

78
  SRpcMsg rpcMsg = {.pCont = pHead,
1,762✔
79
                    .contLen = contLen,
80
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
81
                    .info.ahandle = 0,
82
                    .info.notFreeAhandle = 1,
83
                    .info.refId = 0,
84
                    .info.noResp = 0,
85
                    .info.handle = 0};
86
  SEpSet  epset = {0};
1,762✔
87

88
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,762✔
89

90
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,762✔
91
  if (code != 0) {
1,762✔
92
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
93
  }
94
}
95

96

97

98
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,718,652✔
99
  int32_t code = 0;
2,718,652✔
100
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,718,652✔
101
  if (pMgmt->pData->timeWhiteVer == ver) {
2,718,652✔
102
    if (ver == 0) {
2,716,890✔
103
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,714,152✔
104
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,714,152✔
105
        dError("failed to disable time white list on dnode");
×
106
      }
107
    }
108
    return;
2,716,890✔
109
  }
110
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
1,762✔
111

112
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,762✔
113
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,762✔
114
  if (contLen < 0) {
1,762✔
115
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
116
    return;
×
117
  }
118
  void *pHead = rpcMallocCont(contLen);
1,762✔
119
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,762✔
120
  if (contLen < 0) {
1,762✔
121
    rpcFreeCont(pHead);
×
122
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
123
    return;
×
124
  }
125

126
  SRpcMsg rpcMsg = {.pCont = pHead,
1,762✔
127
                    .contLen = contLen,
128
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
129
                    .info.ahandle = 0,
130
                    .info.notFreeAhandle = 1,
131
                    .info.refId = 0,
132
                    .info.noResp = 0,
133
                    .info.handle = 0};
134
  SEpSet  epset = {0};
1,762✔
135

136
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,762✔
137

138
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,762✔
139
  if (code != 0) {
1,762✔
140
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
141
  }
142
}
143

144

145

146
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,718,652✔
147
  int32_t code = 0;
2,718,652✔
148
  int64_t oldVer = taosAnalyGetVersion();
2,718,652✔
149
  if (oldVer == newVer) return;
2,718,652✔
150
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
151

152
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
153
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
154
  if (contLen < 0) {
×
155
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
156
    return;
×
157
  }
158

159
  void *pHead = rpcMallocCont(contLen);
×
160
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
161
  if (contLen < 0) {
×
162
    rpcFreeCont(pHead);
×
163
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
164
    return;
×
165
  }
166

167
  SRpcMsg rpcMsg = {
×
168
      .pCont = pHead,
169
      .contLen = contLen,
170
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
171
      .info.ahandle = 0,
172
      .info.refId = 0,
173
      .info.noResp = 0,
174
      .info.handle = 0,
175
  };
176
  SEpSet epset = {0};
×
177

178
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
179

180
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
181
  if (code != 0) {
×
182
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
183
  }
184
}
185

186
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
55,418,969✔
187
  const STraceId *trace = &pRsp->info.traceId;
55,418,969✔
188
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
55,418,969✔
189

190
  if (pRsp->code != 0) {
55,418,969✔
191
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
192
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
193
             pMgmt->statusSeq);
194
      pMgmt->pData->dropped = 1;
×
195
      if (dmWriteEps(pMgmt->pData) != 0) {
×
196
        dError("failed to write dnode file");
×
197
      }
198
      dInfo("dnode will exit since it is in the dropped state");
×
199
      (void)raise(SIGINT);
×
200
    }
201
  } else {
202
    SStatusRsp statusRsp = {0};
55,418,969✔
203
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
58,137,621✔
204
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,718,652✔
205
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,718,652✔
206
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
2,498,878✔
207
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
208
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
2,498,878✔
209
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
2,498,878✔
210
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
2,498,878✔
211
      }
212
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,718,652✔
213
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,718,652✔
214
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,718,652✔
215
    }
216
    tFreeSStatusRsp(&statusRsp);
55,418,969✔
217
  }
218
  rpcFreeCont(pRsp->pCont);
55,418,969✔
219
}
55,418,969✔
220

221
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
55,524,138✔
222
  int32_t    code = 0;
55,524,138✔
223
  SStatusReq req = {0};
55,524,138✔
224
  req.timestamp = taosGetTimestampMs();
55,524,138✔
225
  pMgmt->statusSeq++;
55,524,138✔
226

227
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
55,524,138✔
228
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
55,524,138✔
229
    dError("failed to lock status info lock");
×
230
    return;
×
231
  }
232

233
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
55,524,138✔
234
  req.sver = tsVersion;
55,524,138✔
235
  req.dnodeVer = tsDnodeData.dnodeVer;
55,524,138✔
236
  req.dnodeId = tsDnodeData.dnodeId;
55,524,138✔
237
  req.clusterId = tsDnodeData.clusterId;
55,524,138✔
238
  if (req.clusterId == 0) req.dnodeId = 0;
55,524,138✔
239
  req.rebootTime = tsDnodeData.rebootTime;
55,524,138✔
240
  req.updateTime = tsDnodeData.updateTime;
55,524,138✔
241
  req.numOfCores = tsNumOfCores;
55,524,138✔
242
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
55,524,138✔
243
  req.numOfDiskCfg = tsDiskCfgNum;
55,524,138✔
244
  req.memTotal = tsTotalMemoryKB * 1024;
55,524,138✔
245
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
55,524,138✔
246
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
55,524,138✔
247
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
55,524,138✔
248

249
  req.clusterCfg.statusInterval = tsStatusInterval;
55,524,138✔
250
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
55,524,138✔
251
  req.clusterCfg.checkTime = 0;
55,524,138✔
252
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
55,524,138✔
253
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
55,524,138✔
254
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
55,524,138✔
255
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
55,524,138✔
256
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
55,524,138✔
257
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
55,524,138✔
258
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
55,524,138✔
259
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
55,524,138✔
260
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
55,524,138✔
261
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
55,524,138✔
262
  char timestr[32] = "1970-01-01 00:00:00.00";
55,524,138✔
263
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
55,524,138✔
264
      0) {
265
    dError("failed to parse time since %s", tstrerror(code));
×
266
  }
267
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
55,524,138✔
268
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
55,524,138✔
269
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
55,524,138✔
270

271
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
55,524,138✔
272

273
  req.pVloads = tsVinfo.pVloads;
55,524,138✔
274
  tsVinfo.pVloads = NULL;
55,524,138✔
275

276
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
55,524,138✔
277
  req.mload = tsMLoad;
55,524,138✔
278

279
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
55,524,138✔
280
    dError("failed to unlock status info lock");
×
281
    return;
×
282
  }
283

284
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
55,524,138✔
285
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
55,524,138✔
286

287
  req.statusSeq = pMgmt->statusSeq;
55,524,138✔
288
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
55,524,138✔
289
  req.analVer = taosAnalyGetVersion();
55,524,138✔
290
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
55,524,138✔
291

292
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
55,524,138✔
293
  if (contLen < 0) {
55,524,138✔
294
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
295
    return;
×
296
  }
297

298
  void *pHead = rpcMallocCont(contLen);
55,524,138✔
299
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
55,524,138✔
300
  if (contLen < 0) {
55,524,138✔
301
    rpcFreeCont(pHead);
×
302
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
303
    return;
×
304
  }
305
  tFreeSStatusReq(&req);
55,524,138✔
306

307
  SRpcMsg rpcMsg = {.pCont = pHead,
55,524,138✔
308
                    .contLen = contLen,
309
                    .msgType = TDMT_MND_STATUS,
310
                    .info.ahandle = 0,
311
                    .info.notFreeAhandle = 1,
312
                    .info.refId = 0,
313
                    .info.noResp = 0,
314
                    .info.handle = 0};
315
  SRpcMsg rpcRsp = {0};
55,524,138✔
316

317
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
55,524,138✔
318

319
  SEpSet epSet = {0};
55,524,138✔
320
  int8_t epUpdated = 0;
55,524,138✔
321
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
55,524,138✔
322

323
  if (dDebugFlag & DEBUG_TRACE) {
55,524,138✔
324
    char tbuf[512];
2,364,986✔
325
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
2,364,986✔
326
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
2,364,986✔
327
  }
328
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
55,524,138✔
329
  if (code != 0) {
55,524,138✔
330
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
105,169✔
331
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
105,169✔
332
      dmRotateMnodeEpSet(pMgmt->pData);
105,169✔
333
      char tbuf[512];
105,169✔
334
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
105,169✔
335
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
105,169✔
336
            tbuf, epSet.inUse);
337
    }
338
    return;
105,169✔
339
  }
340

341
  if (rpcRsp.code != 0) {
55,418,969✔
342
    dmRotateMnodeEpSet(pMgmt->pData);
×
343
    char tbuf[512];
×
344
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
×
345
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
×
346
          tbuf, epSet.inUse);
347
  } else {
348
    if (epUpdated == 1) {
55,418,969✔
349
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
273,173✔
350
    }
351
  }
352
  dmProcessStatusRsp(pMgmt, &rpcRsp);
55,418,969✔
353
}
354

355
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
672,038✔
356
  const STraceId *trace = &pRsp->info.traceId;
672,038✔
357
  int32_t         code = 0;
672,038✔
358
  SConfigRsp      configRsp = {0};
672,038✔
359
  bool            needStop = false;
672,038✔
360

361
  if (pRsp->code != 0) {
672,038✔
362
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
363
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
364
      pMgmt->pData->dropped = 1;
×
365
      if (dmWriteEps(pMgmt->pData) != 0) {
×
366
        dError("failed to write dnode file");
×
367
      }
368
      dInfo("dnode will exit since it is in the dropped state");
×
369
      (void)raise(SIGINT);
×
370
    }
371
  } else {
372
    bool needUpdate = false;
672,038✔
373
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,344,076✔
374
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
672,038✔
375
      // Try to use cfg from mnode sdb.
376
      if (!configRsp.isVersionVerified) {
672,038✔
377
        uInfo("config version not verified, update config");
503,906✔
378
        needUpdate = true;
503,906✔
379
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
503,906✔
380
        if (code != TSDB_CODE_SUCCESS) {
503,906✔
381
          dError("failed to persist global config since %s", tstrerror(code));
×
382
          goto _exit;
×
383
        }
384
      }
385
    }
386
    if (needUpdate) {
672,038✔
387
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
503,906✔
388
      if (code != TSDB_CODE_SUCCESS) {
503,906✔
389
        dError("failed to update config since %s", tstrerror(code));
×
390
        goto _exit;
×
391
      }
392
      code = setAllConfigs(tsCfg);
503,906✔
393
      if (code != TSDB_CODE_SUCCESS) {
503,906✔
394
        dError("failed to set all configs since %s", tstrerror(code));
×
395
        goto _exit;
×
396
      }
397
    }
398
    code = taosPersistLocalConfig(pMgmt->path);
672,038✔
399
    if (code != TSDB_CODE_SUCCESS) {
672,038✔
400
      dError("failed to persist local config since %s", tstrerror(code));
×
401
    }
402
    tsConfigInited = 1;
672,038✔
403
  }
404
_exit:
672,038✔
405
  tFreeSConfigRsp(&configRsp);
672,038✔
406
  rpcFreeCont(pRsp->pCont);
672,038✔
407
  if (needStop) {
672,038✔
408
    dmStop();
×
409
  }
410
}
672,038✔
411

412
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
672,977✔
413
  int32_t    code = 0;
672,977✔
414
  SConfigReq req = {0};
672,977✔
415

416
  req.cver = tsdmConfigVersion;
672,977✔
417
  req.forceReadConfig = tsForceReadConfig;
672,977✔
418
  req.array = taosGetGlobalCfg(tsCfg);
672,977✔
419
  dDebug("send config req to mnode, configVersion:%d", req.cver);
672,977✔
420

421
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
672,977✔
422
  if (contLen < 0) {
672,977✔
423
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
424
    return;
×
425
  }
426

427
  void *pHead = rpcMallocCont(contLen);
672,977✔
428
  if (pHead == NULL) {
672,977✔
429
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
430
    return;
×
431
  }
432
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
672,977✔
433
  if (contLen < 0) {
672,977✔
434
    rpcFreeCont(pHead);
×
435
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
436
    return;
×
437
  }
438

439
  SRpcMsg rpcMsg = {.pCont = pHead,
672,977✔
440
                    .contLen = contLen,
441
                    .msgType = TDMT_MND_CONFIG,
442
                    .info.ahandle = 0,
443
                    .info.notFreeAhandle = 1,
444
                    .info.refId = 0,
445
                    .info.noResp = 0,
446
                    .info.handle = 0};
447
  SRpcMsg rpcRsp = {0};
672,977✔
448

449
  SEpSet epSet = {0};
672,977✔
450
  int8_t epUpdated = 0;
672,977✔
451
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
672,977✔
452

453
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
672,977✔
454
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
672,977✔
455
  if (code != 0) {
672,977✔
456
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
939✔
457
    return;
939✔
458
  }
459
  if (rpcRsp.code != 0) {
672,038✔
460
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
461
    return;
×
462
  }
463
  dmProcessConfigRsp(pMgmt, &rpcRsp);
672,038✔
464
}
465

466
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
56,279,636✔
467
  dDebug("begin to get dnode info");
56,279,636✔
468
  SDnodeData dnodeData = {0};
56,279,636✔
469
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
56,279,636✔
470
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
56,279,636✔
471
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
56,279,636✔
472
  dnodeData.clusterId = pMgmt->pData->clusterId;
56,279,636✔
473
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
56,279,636✔
474
  dnodeData.updateTime = pMgmt->pData->updateTime;
56,279,636✔
475
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
56,279,636✔
476
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
56,279,636✔
477

478
  dDebug("begin to get vnode loads");
56,279,636✔
479
  SMonVloadInfo vinfo = {0};
56,279,636✔
480
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
56,279,636✔
481

482
  dDebug("begin to get mnode loads");
56,279,636✔
483
  SMonMloadInfo minfo = {0};
56,279,636✔
484
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
56,279,636✔
485

486
  dDebug("begin to lock status info");
56,279,636✔
487
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
56,279,636✔
488
    dError("failed to lock status info lock");
×
489
    return;
×
490
  }
491
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
56,279,636✔
492
  tsDnodeData.dnodeId = dnodeData.dnodeId;
56,279,636✔
493
  tsDnodeData.clusterId = dnodeData.clusterId;
56,279,636✔
494
  tsDnodeData.rebootTime = dnodeData.rebootTime;
56,279,636✔
495
  tsDnodeData.updateTime = dnodeData.updateTime;
56,279,636✔
496
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
56,279,636✔
497

498
  if (tsVinfo.pVloads == NULL) {
56,279,636✔
499
    tsVinfo.pVloads = vinfo.pVloads;
54,922,085✔
500
    vinfo.pVloads = NULL;
54,922,085✔
501
  } else {
502
    taosArrayDestroy(vinfo.pVloads);
1,357,551✔
503
    vinfo.pVloads = NULL;
1,357,551✔
504
  }
505

506
  tsMLoad = minfo.load;
56,279,636✔
507

508
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
56,279,636✔
509
    dError("failed to unlock status info lock");
×
510
    return;
×
511
  }
512
}
513

514
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
515
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
516
  if (contLen < 0) {
×
517
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
518
    return;
×
519
  }
520
  void *pHead = rpcMallocCont(contLen);
×
521
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
522
  if (contLen < 0) {
×
523
    rpcFreeCont(pHead);
×
524
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
525
    return;
×
526
  }
527

528
  SRpcMsg rpcMsg = {.pCont = pHead,
×
529
                    .contLen = contLen,
530
                    .msgType = TDMT_MND_NOTIFY,
531
                    .info.ahandle = 0,
532
                    .info.notFreeAhandle = 1,
533
                    .info.refId = 0,
534
                    .info.noResp = 1,
535
                    .info.handle = 0};
536

537
  SEpSet epSet = {0};
×
538
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
539
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
540
    dError("failed to send notify req");
×
541
  }
542
}
543

544
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
545
  dError("auth rsp is received, but not supported yet");
×
546
  return 0;
×
547
}
548

549
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
550
  dError("grant rsp is received, but not supported yet");
×
551
  return 0;
×
552
}
553

554
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
89,828✔
555
  int32_t       code = 0;
89,828✔
556
  SDCfgDnodeReq cfgReq = {0};
89,828✔
557
  SConfig      *pCfg = taosGetCfg();
89,828✔
558
  SConfigItem  *pItem = NULL;
89,828✔
559

560
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
89,828✔
561
    return TSDB_CODE_INVALID_MSG;
×
562
  }
563
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
89,828✔
564
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
543✔
565
  }
566

567
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
89,285✔
568

569
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
89,285✔
570
  if (code != 0) {
89,285✔
571
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
464✔
572
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
464✔
573
      return TSDB_CODE_SUCCESS;
464✔
574
    } else {
575
      return code;
×
576
    }
577
  }
578
  if (pItem == NULL) {
88,821✔
579
    return TSDB_CODE_CFG_NOT_FOUND;
×
580
  }
581

582
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
88,821✔
583
    char value[10] = {0};
×
584
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
585
      tsSyncTimeout = 0;
×
586
    }
587

588
    if (tsSyncTimeout > 0) {
×
589
      SConfigItem *pItemTmp = NULL;
×
590
      char         tmp[10] = {0};
×
591

592
      sprintf(tmp, "%d", tsSyncTimeout);
×
593
      TAOS_CHECK_RETURN(
×
594
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
595
      if (pItemTmp == NULL) {
×
596
        return TSDB_CODE_CFG_NOT_FOUND;
×
597
      }
598

599
      sprintf(tmp, "%d", tsSyncTimeout / 4);
×
600
      TAOS_CHECK_RETURN(
×
601
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
602
      if (pItemTmp == NULL) {
×
603
        return TSDB_CODE_CFG_NOT_FOUND;
×
604
      }
605
      TAOS_CHECK_RETURN(
×
606
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
607
      if (pItemTmp == NULL) {
×
608
        return TSDB_CODE_CFG_NOT_FOUND;
×
609
      }
610

611
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
612
      TAOS_CHECK_RETURN(
×
613
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
614
      if (pItemTmp == NULL) {
×
615
        return TSDB_CODE_CFG_NOT_FOUND;
×
616
      }
617
      TAOS_CHECK_RETURN(
×
618
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
619
      if (pItemTmp == NULL) {
×
620
        return TSDB_CODE_CFG_NOT_FOUND;
×
621
      }
622
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
623
      if (pItemTmp == NULL) {
×
624
        return TSDB_CODE_CFG_NOT_FOUND;
×
625
      }
626

627
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
628
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
629
      if (pItemTmp == NULL) {
×
630
        return TSDB_CODE_CFG_NOT_FOUND;
×
631
      }
632

633
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
634
      TAOS_CHECK_RETURN(
×
635
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
636
      if (pItemTmp == NULL) {
×
637
        return TSDB_CODE_CFG_NOT_FOUND;
×
638
      }
639
      TAOS_CHECK_RETURN(
×
640
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
641
      if (pItemTmp == NULL) {
×
642
        return TSDB_CODE_CFG_NOT_FOUND;
×
643
      }
644
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
645
      if (pItemTmp == NULL) {
×
646
        return TSDB_CODE_CFG_NOT_FOUND;
×
647
      }
648

649
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
650
            tsSyncTimeout);
651
    }
652
  }
653

654
  if (!isConifgItemLazyMode(pItem)) {
88,821✔
655
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
87,957✔
656

657
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
87,957✔
658
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
659
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
660
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
661

662
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
663
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
664
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
665
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
666

667
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
668
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
669
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
670

671
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
672
            tsSyncTimeout);
673
    }
674
  }
675

676
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
88,821✔
677
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
20,146✔
678
    if (code != TSDB_CODE_SUCCESS) {
20,146✔
679
      dError("failed to persist global config since %s", tstrerror(code));
×
680
    }
681
  } else {
682
    code = taosPersistLocalConfig(pMgmt->path);
68,675✔
683
    if (code != TSDB_CODE_SUCCESS) {
68,675✔
684
      dError("failed to persist local config since %s", tstrerror(code));
×
685
    }
686
  }
687

688
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
88,821✔
689
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
690

691
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
692
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
693
  }
694

695
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
88,821✔
696
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
88,821✔
697
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
698
  }
699

700
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
88,821✔
701
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
88,821✔
702
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
703
  }
704

705
  if (cfgReq.version > 0) {
88,821✔
706
    tsdmConfigVersion = cfgReq.version;
34,710✔
707
  }
708
  return code;
88,821✔
709
}
710

711
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,765✔
712
#ifdef TD_ENTERPRISE
713
  int32_t       code = 0;
2,765✔
714
  SDCfgDnodeReq cfgReq = {0};
2,765✔
715
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
2,765✔
716
    code = TSDB_CODE_INVALID_MSG;
×
717
    goto _exit;
×
718
  }
719

720
  code = dmUpdateEncryptKey(cfgReq.value, true);
2,765✔
721
  if (code == 0) {
2,765✔
722
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
2,765✔
723
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
2,765✔
724
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
2,765✔
725
  }
726

727
_exit:
2,765✔
728
  pMsg->code = code;
2,765✔
729
  pMsg->info.rsp = NULL;
2,765✔
730
  pMsg->info.rspLen = 0;
2,765✔
731
  return code;
2,765✔
732
#else
733
  return 0;
734
#endif
735
}
736

737
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
1,004✔
738
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
1,004✔
739
  pStatus->details[0] = 0;
1,004✔
740

741
  SMonMloadInfo minfo = {0};
1,004✔
742
  (*pMgmt->getMnodeLoadsFp)(&minfo);
1,004✔
743
  if (minfo.isMnode &&
1,004✔
744
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
1,004✔
745
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
746
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
747
    return;
×
748
  }
749

750
  SMonVloadInfo vinfo = {0};
1,004✔
751
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
1,004✔
752
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
3,012✔
753
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
2,008✔
754
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
2,008✔
755
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
756
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
757
               syncStr(pLoad->syncState));
×
758
      break;
×
759
    }
760
  }
761

762
  taosArrayDestroy(vinfo.pVloads);
1,004✔
763
}
764

765
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,004✔
766
  int32_t code = 0;
1,004✔
767
  dDebug("server run status req is received");
1,004✔
768
  SServerStatusRsp statusRsp = {0};
1,004✔
769
  dmGetServerRunStatus(pMgmt, &statusRsp);
1,004✔
770

771
  pMsg->info.rsp = NULL;
1,004✔
772
  pMsg->info.rspLen = 0;
1,004✔
773

774
  SRpcMsg rspMsg = {.info = pMsg->info};
1,004✔
775
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
1,004✔
776
  if (rspLen < 0) {
1,004✔
777
    return TSDB_CODE_OUT_OF_MEMORY;
×
778
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
779
    // return rspMsg.code;
780
  }
781

782
  void *pRsp = rpcMallocCont(rspLen);
1,004✔
783
  if (pRsp == NULL) {
1,004✔
784
    return terrno;
×
785
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
786
    // return rspMsg.code;
787
  }
788

789
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
1,004✔
790
  if (rspLen < 0) {
1,004✔
791
    return TSDB_CODE_INVALID_MSG;
×
792
  }
793

794
  pMsg->info.rsp = pRsp;
1,004✔
795
  pMsg->info.rspLen = rspLen;
1,004✔
796
  return 0;
1,004✔
797
}
798

799
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
70,053✔
800
  int32_t code = 0;
70,053✔
801

802
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
70,053✔
803
  if (pBlock == NULL) {
70,053✔
804
    return terrno;
×
805
  }
806

807
  size_t size = 0;
70,053✔
808

809
  const SSysTableMeta *pMeta = NULL;
70,053✔
810
  getInfosDbMeta(&pMeta, &size);
70,053✔
811

812
  int32_t index = 0;
70,053✔
813
  for (int32_t i = 0; i < size; ++i) {
1,401,060✔
814
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
1,401,060✔
815
      index = i;
70,053✔
816
      break;
70,053✔
817
    }
818
  }
819

820
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
70,053✔
821
  if (pBlock->pDataBlock == NULL) {
70,053✔
822
    code = terrno;
×
823
    goto _exit;
×
824
  }
825

826
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
490,371✔
827
    SColumnInfoData colInfoData = {0};
420,318✔
828
    colInfoData.info.colId = i + 1;
420,318✔
829
    colInfoData.info.type = pMeta[index].schema[i].type;
420,318✔
830
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
420,318✔
831
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
840,636✔
832
      code = terrno;
×
833
      goto _exit;
×
834
    }
835
  }
836

837
  pBlock->info.hasVarCol = true;
70,053✔
838
_exit:
70,053✔
839
  if (code != 0) {
70,053✔
840
    blockDataDestroy(pBlock);
×
841
  } else {
842
    *ppBlock = pBlock;
70,053✔
843
  }
844
  return code;
70,053✔
845
}
846

847
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
70,053✔
848
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
70,053✔
849
  if (code != 0) {
70,053✔
850
    return code;
×
851
  }
852

853
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
70,053✔
854
  if (pColInfo == NULL) {
70,053✔
855
    return TSDB_CODE_OUT_OF_RANGE;
×
856
  }
857

858
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
70,053✔
859
}
860

861
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
70,053✔
862
  int32_t           size = 0;
70,053✔
863
  int32_t           rowsRead = 0;
70,053✔
864
  int32_t           code = 0;
70,053✔
865
  SRetrieveTableReq retrieveReq = {0};
70,053✔
866
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
70,053✔
867
    return TSDB_CODE_INVALID_MSG;
×
868
  }
869
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
70,053✔
870
#if 0
871
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
872
    code = TSDB_CODE_MND_NO_RIGHTS;
873
    return code;
874
  }
875
#endif
876
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
70,053✔
877
    return TSDB_CODE_INVALID_MSG;
×
878
  }
879

880
  SSDataBlock *pBlock = NULL;
70,053✔
881
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
70,053✔
882
    return code;
×
883
  }
884

885
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
70,053✔
886
  if (code != 0) {
70,053✔
887
    blockDataDestroy(pBlock);
×
888
    return code;
×
889
  }
890

891
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
70,053✔
892
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
70,053✔
893
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
70,053✔
894

895
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
70,053✔
896
  if (pRsp == NULL) {
70,053✔
897
    code = terrno;
×
898
    dError("failed to retrieve data since %s", tstrerror(code));
×
899
    blockDataDestroy(pBlock);
×
900
    return code;
×
901
  }
902

903
  char *pStart = pRsp->data;
70,053✔
904
  *(int32_t *)pStart = htonl(numOfCols);
70,053✔
905
  pStart += sizeof(int32_t);  // number of columns
70,053✔
906

907
  for (int32_t i = 0; i < numOfCols; ++i) {
490,371✔
908
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
420,318✔
909
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
420,318✔
910

911
    pSchema->bytes = htonl(pColInfo->info.bytes);
420,318✔
912
    pSchema->colId = htons(pColInfo->info.colId);
420,318✔
913
    pSchema->type = pColInfo->info.type;
420,318✔
914

915
    pStart += sizeof(SSysTableSchema);
420,318✔
916
  }
917

918
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
70,053✔
919
  if (len < 0) {
70,053✔
920
    dError("failed to retrieve data since %s", tstrerror(code));
×
921
    blockDataDestroy(pBlock);
×
922
    rpcFreeCont(pRsp);
×
923
    return terrno;
×
924
  }
925

926
  pRsp->numOfRows = htonl(pBlock->info.rows);
70,053✔
927
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
70,053✔
928
  pRsp->completed = 1;
70,053✔
929
  pMsg->info.rsp = pRsp;
70,053✔
930
  pMsg->info.rspLen = size;
70,053✔
931
  dDebug("dnode variables retrieve completed");
70,053✔
932

933
  blockDataDestroy(pBlock);
70,053✔
934
  return TSDB_CODE_SUCCESS;
70,053✔
935
}
936

937
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
19,607,654✔
938
  SMStreamHbRspMsg rsp = {0};
19,607,654✔
939
  int32_t          code = 0;
19,607,654✔
940
  SDecoder         decoder;
19,580,606✔
941
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
19,607,654✔
942
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
19,607,654✔
943
  int64_t          currTs = taosGetTimestampMs();
19,607,654✔
944

945
  if (pMsg->code) {
19,607,654✔
946
    return streamHbHandleRspErr(pMsg->code, currTs);
238,844✔
947
  }
948

949
  tDecoderInit(&decoder, (uint8_t*)msg, len);
19,368,810✔
950
  code = tDecodeStreamHbRsp(&decoder, &rsp);
19,368,810✔
951
  if (code < 0) {
19,368,810✔
952
    code = TSDB_CODE_INVALID_MSG;
×
953
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
954
    tDecoderClear(&decoder);
×
955
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
956
    return streamHbHandleRspErr(code, currTs);
×
957
  }
958

959
  tDecoderClear(&decoder);
19,368,810✔
960

961
  return streamHbProcessRspMsg(&rsp);
19,368,810✔
962
}
963

964

965
SArray *dmGetMsgHandles() {
677,587✔
966
  int32_t code = -1;
677,587✔
967
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
677,587✔
968
  if (pArray == NULL) {
677,587✔
969
    return NULL;
×
970
  }
971

972
  // Requests handled by DNODE
973
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
974
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
975
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
976
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
977
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
978
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
979
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
980
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
981
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
982
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
983
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
984
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
985
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
986
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
987
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
988

989
  // Requests handled by MNODE
990
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
991
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
992
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
677,587✔
993

994
  code = 0;
677,587✔
995

996
_OVER:
677,587✔
997
  if (code != 0) {
677,587✔
998
    taosArrayDestroy(pArray);
×
999
    return NULL;
×
1000
  } else {
1001
    return pArray;
677,587✔
1002
  }
1003
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc