• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4875

09 Dec 2025 01:22AM UTC coverage: 64.472% (-0.2%) from 64.623%
#4875

push

travis-ci

guanshengliang
fix: temporarily disable memory leak detection for UDF tests (#33856)

162014 of 251293 relevant lines covered (64.47%)

104318075.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.25
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
#include "monitor.h"
20
#include "systable.h"
21
#include "tanalytics.h"
22
#include "tchecksum.h"
23
#include "tutil.h"
24
#include "stream.h"
25

26
extern SConfig *tsCfg;
27

28
SMonVloadInfo tsVinfo = {0};
29
SMnodeLoad    tsMLoad = {0};
30
SDnodeData    tsDnodeData = {0};
31

32
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
2,458,243✔
33
  int32_t code = 0;
2,458,243✔
34
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
2,458,243✔
35
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
520,231✔
36
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
520,231✔
37
    pMgmt->pData->dnodeId = pCfg->dnodeId;
520,231✔
38
    pMgmt->pData->clusterId = pCfg->clusterId;
520,231✔
39
    monSetDnodeId(pCfg->dnodeId);
520,231✔
40
    auditSetDnodeId(pCfg->dnodeId);
520,231✔
41
    code = dmWriteEps(pMgmt->pData);
520,231✔
42
    if (code != 0) {
520,231✔
43
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
3,597✔
44
            tstrerror(code));
45
    }
46
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
520,231✔
47
  }
48
}
2,458,243✔
49

50
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,668,307✔
51
  int32_t code = 0;
2,668,307✔
52
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,668,307✔
53
  if (pMgmt->pData->ipWhiteVer == ver) {
2,668,307✔
54
    if (ver == 0) {
2,666,441✔
55
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,664,713✔
56
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,664,713✔
57
        dError("failed to disable ip white list on dnode");
×
58
      }
59
    }
60
    return;
2,666,441✔
61
  }
62
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
1,866✔
63

64
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,866✔
65
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,866✔
66
  if (contLen < 0) {
1,866✔
67
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
68
    return;
×
69
  }
70
  void *pHead = rpcMallocCont(contLen);
1,866✔
71
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,866✔
72
  if (contLen < 0) {
1,866✔
73
    rpcFreeCont(pHead);
×
74
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
75
    return;
×
76
  }
77

78
  SRpcMsg rpcMsg = {.pCont = pHead,
1,866✔
79
                    .contLen = contLen,
80
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
81
                    .info.ahandle = 0,
82
                    .info.notFreeAhandle = 1,
83
                    .info.refId = 0,
84
                    .info.noResp = 0,
85
                    .info.handle = 0};
86
  SEpSet  epset = {0};
1,866✔
87

88
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,866✔
89

90
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,866✔
91
  if (code != 0) {
1,866✔
92
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
93
  }
94
}
95

96

97

98
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,668,307✔
99
  int32_t code = 0;
2,668,307✔
100
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,668,307✔
101
  if (pMgmt->pData->timeWhiteVer == ver) {
2,668,307✔
102
    if (ver == 0) {
2,666,441✔
103
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,664,713✔
104
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,664,713✔
105
        dError("failed to disable time white list on dnode");
×
106
      }
107
    }
108
    return;
2,666,441✔
109
  }
110
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
1,866✔
111

112
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,866✔
113
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,866✔
114
  if (contLen < 0) {
1,866✔
115
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
116
    return;
×
117
  }
118
  void *pHead = rpcMallocCont(contLen);
1,866✔
119
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,866✔
120
  if (contLen < 0) {
1,866✔
121
    rpcFreeCont(pHead);
×
122
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
123
    return;
×
124
  }
125

126
  SRpcMsg rpcMsg = {.pCont = pHead,
1,866✔
127
                    .contLen = contLen,
128
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
129
                    .info.ahandle = 0,
130
                    .info.notFreeAhandle = 1,
131
                    .info.refId = 0,
132
                    .info.noResp = 0,
133
                    .info.handle = 0};
134
  SEpSet  epset = {0};
1,866✔
135

136
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,866✔
137

138
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,866✔
139
  if (code != 0) {
1,866✔
140
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
141
  }
142
}
143

144

145

146
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,668,307✔
147
  int32_t code = 0;
2,668,307✔
148
  int64_t oldVer = taosAnalyGetVersion();
2,668,307✔
149
  if (oldVer == newVer) return;
2,668,307✔
150
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
151

152
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
153
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
154
  if (contLen < 0) {
×
155
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
156
    return;
×
157
  }
158

159
  void *pHead = rpcMallocCont(contLen);
×
160
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
161
  if (contLen < 0) {
×
162
    rpcFreeCont(pHead);
×
163
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
164
    return;
×
165
  }
166

167
  SRpcMsg rpcMsg = {
×
168
      .pCont = pHead,
169
      .contLen = contLen,
170
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
171
      .info.ahandle = 0,
172
      .info.refId = 0,
173
      .info.noResp = 0,
174
      .info.handle = 0,
175
  };
176
  SEpSet epset = {0};
×
177

178
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
179

180
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
181
  if (code != 0) {
×
182
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
183
  }
184
}
185

186
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
40,826,758✔
187
  const STraceId *trace = &pRsp->info.traceId;
40,826,758✔
188
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
40,826,758✔
189

190
  if (pRsp->code != 0) {
40,826,758✔
191
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
192
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
193
             pMgmt->statusSeq);
194
      pMgmt->pData->dropped = 1;
×
195
      if (dmWriteEps(pMgmt->pData) != 0) {
×
196
        dError("failed to write dnode file");
×
197
      }
198
      dInfo("dnode will exit since it is in the dropped state");
×
199
      (void)raise(SIGINT);
×
200
    }
201
  } else {
202
    SStatusRsp statusRsp = {0};
40,826,758✔
203
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
43,495,065✔
204
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,668,307✔
205
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,668,307✔
206
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
2,458,243✔
207
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
208
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
2,458,243✔
209
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
2,458,243✔
210
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
2,458,243✔
211
      }
212
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,668,307✔
213
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,668,307✔
214
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,668,307✔
215
    }
216
    tFreeSStatusRsp(&statusRsp);
40,826,758✔
217
  }
218
  rpcFreeCont(pRsp->pCont);
40,826,758✔
219
}
40,826,758✔
220

221
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
40,951,865✔
222
  int32_t    code = 0;
40,951,865✔
223
  SStatusReq req = {0};
40,951,865✔
224
  req.timestamp = taosGetTimestampMs();
40,951,865✔
225
  pMgmt->statusSeq++;
40,951,865✔
226

227
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
40,951,865✔
228
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
40,951,865✔
229
    dError("failed to lock status info lock");
×
230
    return;
×
231
  }
232

233
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
40,951,865✔
234
  req.sver = tsVersion;
40,951,865✔
235
  req.dnodeVer = tsDnodeData.dnodeVer;
40,951,865✔
236
  req.dnodeId = tsDnodeData.dnodeId;
40,951,865✔
237
  req.clusterId = tsDnodeData.clusterId;
40,951,865✔
238
  if (req.clusterId == 0) req.dnodeId = 0;
40,951,865✔
239
  req.rebootTime = tsDnodeData.rebootTime;
40,951,865✔
240
  req.updateTime = tsDnodeData.updateTime;
40,951,865✔
241
  req.numOfCores = tsNumOfCores;
40,951,865✔
242
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
40,951,865✔
243
  req.numOfDiskCfg = tsDiskCfgNum;
40,951,865✔
244
  req.memTotal = tsTotalMemoryKB * 1024;
40,951,865✔
245
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
40,951,865✔
246
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
40,951,865✔
247
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
40,951,865✔
248

249
  req.clusterCfg.statusInterval = tsStatusInterval;
40,951,865✔
250
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
40,951,865✔
251
  req.clusterCfg.checkTime = 0;
40,951,865✔
252
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
40,951,865✔
253
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
40,951,865✔
254
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
40,951,865✔
255
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
40,951,865✔
256
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
40,951,865✔
257
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
40,951,865✔
258
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
40,951,865✔
259
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
40,951,865✔
260
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
40,951,865✔
261
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
40,951,865✔
262
  char timestr[32] = "1970-01-01 00:00:00.00";
40,951,865✔
263
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
40,951,865✔
264
      0) {
265
    dError("failed to parse time since %s", tstrerror(code));
×
266
  }
267
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
40,951,865✔
268
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
40,951,865✔
269
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
40,951,865✔
270

271
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
40,951,865✔
272

273
  req.pVloads = tsVinfo.pVloads;
40,951,865✔
274
  tsVinfo.pVloads = NULL;
40,951,865✔
275

276
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
40,951,865✔
277
  req.mload = tsMLoad;
40,951,865✔
278

279
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
40,951,865✔
280
    dError("failed to unlock status info lock");
×
281
    return;
×
282
  }
283

284
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
40,951,865✔
285
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
40,951,865✔
286

287
  req.statusSeq = pMgmt->statusSeq;
40,951,865✔
288
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
40,951,865✔
289
  req.analVer = taosAnalyGetVersion();
40,951,865✔
290
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
40,951,865✔
291

292
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
40,951,865✔
293
  if (contLen < 0) {
40,951,865✔
294
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
295
    return;
×
296
  }
297

298
  void *pHead = rpcMallocCont(contLen);
40,951,865✔
299
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
40,951,865✔
300
  if (contLen < 0) {
40,951,865✔
301
    rpcFreeCont(pHead);
×
302
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
303
    return;
×
304
  }
305
  tFreeSStatusReq(&req);
40,951,865✔
306

307
  SRpcMsg rpcMsg = {.pCont = pHead,
40,951,865✔
308
                    .contLen = contLen,
309
                    .msgType = TDMT_MND_STATUS,
310
                    .info.ahandle = 0,
311
                    .info.notFreeAhandle = 1,
312
                    .info.refId = 0,
313
                    .info.noResp = 0,
314
                    .info.handle = 0};
315
  SRpcMsg rpcRsp = {0};
40,951,865✔
316

317
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
40,951,865✔
318

319
  SEpSet epSet = {0};
40,951,865✔
320
  int8_t epUpdated = 0;
40,951,865✔
321
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
40,951,865✔
322

323
  if (dDebugFlag & DEBUG_TRACE) {
40,951,865✔
324
    char tbuf[512];
1,078,504✔
325
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
1,078,504✔
326
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
1,078,504✔
327
  }
328
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
40,951,865✔
329
  if (code != 0) {
40,951,865✔
330
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
125,107✔
331
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
125,107✔
332
      dmRotateMnodeEpSet(pMgmt->pData);
125,107✔
333
      char tbuf[512];
125,107✔
334
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
125,107✔
335
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
125,107✔
336
            tbuf, epSet.inUse);
337
    }
338
    return;
125,107✔
339
  }
340

341
  if (rpcRsp.code != 0) {
40,826,758✔
342
    dmRotateMnodeEpSet(pMgmt->pData);
×
343
    char tbuf[512];
×
344
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
×
345
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
×
346
          tbuf, epSet.inUse);
347
  } else {
348
    if (epUpdated == 1) {
40,826,758✔
349
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
294,614✔
350
    }
351
  }
352
  dmProcessStatusRsp(pMgmt, &rpcRsp);
40,826,758✔
353
}
354

355
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
694,521✔
356
  const STraceId *trace = &pRsp->info.traceId;
694,521✔
357
  int32_t         code = 0;
694,521✔
358
  SConfigRsp      configRsp = {0};
694,521✔
359
  bool            needStop = false;
694,521✔
360

361
  if (pRsp->code != 0) {
694,521✔
362
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
363
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
364
      pMgmt->pData->dropped = 1;
×
365
      if (dmWriteEps(pMgmt->pData) != 0) {
×
366
        dError("failed to write dnode file");
×
367
      }
368
      dInfo("dnode will exit since it is in the dropped state");
×
369
      (void)raise(SIGINT);
×
370
    }
371
  } else {
372
    bool needUpdate = false;
694,521✔
373
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,389,042✔
374
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
694,521✔
375
      // Try to use cfg from mnode sdb.
376
      if (!configRsp.isVersionVerified) {
694,521✔
377
        uInfo("config version not verified, update config");
522,679✔
378
        needUpdate = true;
522,679✔
379
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
522,679✔
380
        if (code != TSDB_CODE_SUCCESS) {
522,679✔
381
          dError("failed to persist global config since %s", tstrerror(code));
×
382
          goto _exit;
×
383
        }
384
      }
385
    }
386
    if (needUpdate) {
694,521✔
387
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
522,679✔
388
      if (code != TSDB_CODE_SUCCESS) {
522,679✔
389
        dError("failed to update config since %s", tstrerror(code));
×
390
        goto _exit;
×
391
      }
392
      code = setAllConfigs(tsCfg);
522,679✔
393
      if (code != TSDB_CODE_SUCCESS) {
522,679✔
394
        dError("failed to set all configs since %s", tstrerror(code));
×
395
        goto _exit;
×
396
      }
397
    }
398
    code = taosPersistLocalConfig(pMgmt->path);
694,521✔
399
    if (code != TSDB_CODE_SUCCESS) {
694,521✔
400
      dError("failed to persist local config since %s", tstrerror(code));
×
401
    }
402
    tsConfigInited = 1;
694,521✔
403
  }
404
_exit:
694,521✔
405
  tFreeSConfigRsp(&configRsp);
694,521✔
406
  rpcFreeCont(pRsp->pCont);
694,521✔
407
  if (needStop) {
694,521✔
408
    dmStop();
×
409
  }
410
}
694,521✔
411

412
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
719,133✔
413
  int32_t    code = 0;
719,133✔
414
  SConfigReq req = {0};
719,133✔
415

416
  req.cver = tsdmConfigVersion;
719,133✔
417
  req.forceReadConfig = tsForceReadConfig;
719,133✔
418
  req.array = taosGetGlobalCfg(tsCfg);
719,133✔
419
  dDebug("send config req to mnode, configVersion:%d", req.cver);
719,133✔
420

421
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
719,133✔
422
  if (contLen < 0) {
719,133✔
423
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
424
    return;
×
425
  }
426

427
  void *pHead = rpcMallocCont(contLen);
719,133✔
428
  if (pHead == NULL) {
719,133✔
429
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
430
    return;
×
431
  }
432
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
719,133✔
433
  if (contLen < 0) {
719,133✔
434
    rpcFreeCont(pHead);
×
435
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
436
    return;
×
437
  }
438

439
  SRpcMsg rpcMsg = {.pCont = pHead,
719,133✔
440
                    .contLen = contLen,
441
                    .msgType = TDMT_MND_CONFIG,
442
                    .info.ahandle = 0,
443
                    .info.notFreeAhandle = 1,
444
                    .info.refId = 0,
445
                    .info.noResp = 0,
446
                    .info.handle = 0};
447
  SRpcMsg rpcRsp = {0};
719,133✔
448

449
  SEpSet epSet = {0};
719,133✔
450
  int8_t epUpdated = 0;
719,133✔
451
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
719,133✔
452

453
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
719,133✔
454
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
719,133✔
455
  if (code != 0) {
719,133✔
456
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
24,612✔
457
    return;
24,612✔
458
  }
459
  if (rpcRsp.code != 0) {
694,521✔
460
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
461
    return;
×
462
  }
463
  dmProcessConfigRsp(pMgmt, &rpcRsp);
694,521✔
464
}
465

466
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
41,702,173✔
467
  dDebug("begin to get dnode info");
41,702,173✔
468
  SDnodeData dnodeData = {0};
41,702,173✔
469
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
41,702,173✔
470
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
41,702,173✔
471
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
41,702,173✔
472
  dnodeData.clusterId = pMgmt->pData->clusterId;
41,702,173✔
473
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
41,702,173✔
474
  dnodeData.updateTime = pMgmt->pData->updateTime;
41,702,173✔
475
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
41,702,173✔
476
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
41,702,173✔
477

478
  dDebug("begin to get vnode loads");
41,702,173✔
479
  SMonVloadInfo vinfo = {0};
41,702,173✔
480
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
41,702,173✔
481

482
  dDebug("begin to get mnode loads");
41,702,173✔
483
  SMonMloadInfo minfo = {0};
41,702,173✔
484
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
41,702,173✔
485

486
  dDebug("begin to lock status info");
41,702,173✔
487
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
41,702,173✔
488
    dError("failed to lock status info lock");
×
489
    return;
×
490
  }
491
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
41,702,173✔
492
  tsDnodeData.dnodeId = dnodeData.dnodeId;
41,702,173✔
493
  tsDnodeData.clusterId = dnodeData.clusterId;
41,702,173✔
494
  tsDnodeData.rebootTime = dnodeData.rebootTime;
41,702,173✔
495
  tsDnodeData.updateTime = dnodeData.updateTime;
41,702,173✔
496
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
41,702,173✔
497

498
  if (tsVinfo.pVloads == NULL) {
41,702,173✔
499
    tsVinfo.pVloads = vinfo.pVloads;
40,378,548✔
500
    vinfo.pVloads = NULL;
40,378,548✔
501
  } else {
502
    taosArrayDestroy(vinfo.pVloads);
1,323,625✔
503
    vinfo.pVloads = NULL;
1,323,625✔
504
  }
505

506
  tsMLoad = minfo.load;
41,702,173✔
507

508
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
41,702,173✔
509
    dError("failed to unlock status info lock");
×
510
    return;
×
511
  }
512
}
513

514
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
515
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
516
  if (contLen < 0) {
×
517
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
518
    return;
×
519
  }
520
  void *pHead = rpcMallocCont(contLen);
×
521
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
522
  if (contLen < 0) {
×
523
    rpcFreeCont(pHead);
×
524
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
525
    return;
×
526
  }
527

528
  SRpcMsg rpcMsg = {.pCont = pHead,
×
529
                    .contLen = contLen,
530
                    .msgType = TDMT_MND_NOTIFY,
531
                    .info.ahandle = 0,
532
                    .info.notFreeAhandle = 1,
533
                    .info.refId = 0,
534
                    .info.noResp = 1,
535
                    .info.handle = 0};
536

537
  SEpSet epSet = {0};
×
538
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
539
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
540
    dError("failed to send notify req");
×
541
  }
542
}
543

544
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
545
  dError("auth rsp is received, but not supported yet");
×
546
  return 0;
×
547
}
548

549
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
550
  dError("grant rsp is received, but not supported yet");
×
551
  return 0;
×
552
}
553

554
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
90,652✔
555
  int32_t       code = 0;
90,652✔
556
  SDCfgDnodeReq cfgReq = {0};
90,652✔
557
  SConfig      *pCfg = taosGetCfg();
90,652✔
558
  SConfigItem  *pItem = NULL;
90,652✔
559

560
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
90,652✔
561
    return TSDB_CODE_INVALID_MSG;
×
562
  }
563
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
90,652✔
564
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
578✔
565
  }
566

567
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
90,074✔
568

569
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
90,074✔
570
  if (code != 0) {
90,074✔
571
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
496✔
572
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
496✔
573
      return TSDB_CODE_SUCCESS;
496✔
574
    } else {
575
      return code;
×
576
    }
577
  }
578
  if (pItem == NULL) {
89,578✔
579
    return TSDB_CODE_CFG_NOT_FOUND;
×
580
  }
581

582
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
89,578✔
583
    char value[10] = {0};
×
584
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
585
      tsSyncTimeout = 0;
×
586
    }
587

588
    if (tsSyncTimeout > 0) {
×
589
      SConfigItem *pItemTmp = NULL;
×
590
      char         tmp[10] = {0};
×
591

592
      sprintf(tmp, "%d", tsSyncTimeout);
×
593
      TAOS_CHECK_RETURN(
×
594
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
595
      if (pItemTmp == NULL) {
×
596
        return TSDB_CODE_CFG_NOT_FOUND;
×
597
      }
598

599
      sprintf(tmp, "%d", tsSyncTimeout / 4);
×
600
      TAOS_CHECK_RETURN(
×
601
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
602
      if (pItemTmp == NULL) {
×
603
        return TSDB_CODE_CFG_NOT_FOUND;
×
604
      }
605
      TAOS_CHECK_RETURN(
×
606
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
607
      if (pItemTmp == NULL) {
×
608
        return TSDB_CODE_CFG_NOT_FOUND;
×
609
      }
610

611
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
612
      TAOS_CHECK_RETURN(
×
613
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
614
      if (pItemTmp == NULL) {
×
615
        return TSDB_CODE_CFG_NOT_FOUND;
×
616
      }
617
      TAOS_CHECK_RETURN(
×
618
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
619
      if (pItemTmp == NULL) {
×
620
        return TSDB_CODE_CFG_NOT_FOUND;
×
621
      }
622
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
623
      if (pItemTmp == NULL) {
×
624
        return TSDB_CODE_CFG_NOT_FOUND;
×
625
      }
626

627
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
628
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
629
      if (pItemTmp == NULL) {
×
630
        return TSDB_CODE_CFG_NOT_FOUND;
×
631
      }
632

633
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
634
      TAOS_CHECK_RETURN(
×
635
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
636
      if (pItemTmp == NULL) {
×
637
        return TSDB_CODE_CFG_NOT_FOUND;
×
638
      }
639
      TAOS_CHECK_RETURN(
×
640
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
641
      if (pItemTmp == NULL) {
×
642
        return TSDB_CODE_CFG_NOT_FOUND;
×
643
      }
644
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
645
      if (pItemTmp == NULL) {
×
646
        return TSDB_CODE_CFG_NOT_FOUND;
×
647
      }
648

649
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
650
            tsSyncTimeout);
651
    }
652
  }
653

654
  if (!isConifgItemLazyMode(pItem)) {
89,578✔
655
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
88,691✔
656

657
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
88,691✔
658
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
659
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
660
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
661

662
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
663
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
664
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
665
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
666

667
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
668
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
669
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
670

671
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
672
            tsSyncTimeout);
673
    }
674
  }
675

676
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
89,578✔
677
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
21,108✔
678
    if (code != TSDB_CODE_SUCCESS) {
21,108✔
679
      dError("failed to persist global config since %s", tstrerror(code));
×
680
    }
681
  } else {
682
    code = taosPersistLocalConfig(pMgmt->path);
68,470✔
683
    if (code != TSDB_CODE_SUCCESS) {
68,470✔
684
      dError("failed to persist local config since %s", tstrerror(code));
×
685
    }
686
  }
687

688
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
89,578✔
689
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
690

691
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
692
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
693
  }
694

695
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
89,578✔
696
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
89,578✔
697
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
698
  }
699

700
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
89,578✔
701
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
89,578✔
702
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
703
  }
704

705
  if (cfgReq.version > 0) {
89,578✔
706
    tsdmConfigVersion = cfgReq.version;
36,368✔
707
  }
708
  return code;
89,578✔
709
}
710

711
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,792✔
712
#ifdef TD_ENTERPRISE
713
  int32_t       code = 0;
2,792✔
714
  SDCfgDnodeReq cfgReq = {0};
2,792✔
715
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
2,792✔
716
    code = TSDB_CODE_INVALID_MSG;
×
717
    goto _exit;
×
718
  }
719

720
  code = dmUpdateEncryptKey(cfgReq.value, true);
2,792✔
721
  if (code == 0) {
2,792✔
722
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
2,792✔
723
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
2,792✔
724
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
2,792✔
725
  }
726

727
_exit:
2,792✔
728
  pMsg->code = code;
2,792✔
729
  pMsg->info.rsp = NULL;
2,792✔
730
  pMsg->info.rspLen = 0;
2,792✔
731
  return code;
2,792✔
732
#else
733
  return 0;
734
#endif
735
}
736

737
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
1,111✔
738
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
1,111✔
739
  pStatus->details[0] = 0;
1,111✔
740

741
  SMonMloadInfo minfo = {0};
1,111✔
742
  (*pMgmt->getMnodeLoadsFp)(&minfo);
1,111✔
743
  if (minfo.isMnode &&
1,111✔
744
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
1,111✔
745
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
746
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
747
    return;
×
748
  }
749

750
  SMonVloadInfo vinfo = {0};
1,111✔
751
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
1,111✔
752
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
3,333✔
753
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
2,222✔
754
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
2,222✔
755
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
756
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
757
               syncStr(pLoad->syncState));
×
758
      break;
×
759
    }
760
  }
761

762
  taosArrayDestroy(vinfo.pVloads);
1,111✔
763
}
764

765
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,111✔
766
  int32_t code = 0;
1,111✔
767
  dDebug("server run status req is received");
1,111✔
768
  SServerStatusRsp statusRsp = {0};
1,111✔
769
  dmGetServerRunStatus(pMgmt, &statusRsp);
1,111✔
770

771
  pMsg->info.rsp = NULL;
1,111✔
772
  pMsg->info.rspLen = 0;
1,111✔
773

774
  SRpcMsg rspMsg = {.info = pMsg->info};
1,111✔
775
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
1,111✔
776
  if (rspLen < 0) {
1,111✔
777
    return TSDB_CODE_OUT_OF_MEMORY;
×
778
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
779
    // return rspMsg.code;
780
  }
781

782
  void *pRsp = rpcMallocCont(rspLen);
1,111✔
783
  if (pRsp == NULL) {
1,111✔
784
    return terrno;
×
785
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
786
    // return rspMsg.code;
787
  }
788

789
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
1,111✔
790
  if (rspLen < 0) {
1,111✔
791
    return TSDB_CODE_INVALID_MSG;
×
792
  }
793

794
  pMsg->info.rsp = pRsp;
1,111✔
795
  pMsg->info.rspLen = rspLen;
1,111✔
796
  return 0;
1,111✔
797
}
798

799
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
74,207✔
800
  int32_t code = 0;
74,207✔
801

802
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
74,207✔
803
  if (pBlock == NULL) {
74,207✔
804
    return terrno;
×
805
  }
806

807
  size_t size = 0;
74,207✔
808

809
  const SSysTableMeta *pMeta = NULL;
74,207✔
810
  getInfosDbMeta(&pMeta, &size);
74,207✔
811

812
  int32_t index = 0;
74,207✔
813
  for (int32_t i = 0; i < size; ++i) {
1,484,140✔
814
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
1,484,140✔
815
      index = i;
74,207✔
816
      break;
74,207✔
817
    }
818
  }
819

820
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
74,207✔
821
  if (pBlock->pDataBlock == NULL) {
74,207✔
822
    code = terrno;
×
823
    goto _exit;
×
824
  }
825

826
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
519,449✔
827
    SColumnInfoData colInfoData = {0};
445,242✔
828
    colInfoData.info.colId = i + 1;
445,242✔
829
    colInfoData.info.type = pMeta[index].schema[i].type;
445,242✔
830
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
445,242✔
831
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
890,484✔
832
      code = terrno;
×
833
      goto _exit;
×
834
    }
835
  }
836

837
  pBlock->info.hasVarCol = true;
74,207✔
838
_exit:
74,207✔
839
  if (code != 0) {
74,207✔
840
    blockDataDestroy(pBlock);
×
841
  } else {
842
    *ppBlock = pBlock;
74,207✔
843
  }
844
  return code;
74,207✔
845
}
846

847
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
74,207✔
848
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
74,207✔
849
  if (code != 0) {
74,207✔
850
    return code;
×
851
  }
852

853
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
74,207✔
854
  if (pColInfo == NULL) {
74,207✔
855
    return TSDB_CODE_OUT_OF_RANGE;
×
856
  }
857

858
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
74,207✔
859
}
860

861
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
74,207✔
862
  int32_t           size = 0;
74,207✔
863
  int32_t           rowsRead = 0;
74,207✔
864
  int32_t           code = 0;
74,207✔
865
  SRetrieveTableReq retrieveReq = {0};
74,207✔
866
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
74,207✔
867
    return TSDB_CODE_INVALID_MSG;
×
868
  }
869
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
74,207✔
870
#if 0
871
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
872
    code = TSDB_CODE_MND_NO_RIGHTS;
873
    return code;
874
  }
875
#endif
876
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
74,207✔
877
    return TSDB_CODE_INVALID_MSG;
×
878
  }
879

880
  SSDataBlock *pBlock = NULL;
74,207✔
881
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
74,207✔
882
    return code;
×
883
  }
884

885
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
74,207✔
886
  if (code != 0) {
74,207✔
887
    blockDataDestroy(pBlock);
×
888
    return code;
×
889
  }
890

891
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
74,207✔
892
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
74,207✔
893
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
74,207✔
894

895
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
74,207✔
896
  if (pRsp == NULL) {
74,207✔
897
    code = terrno;
×
898
    dError("failed to retrieve data since %s", tstrerror(code));
×
899
    blockDataDestroy(pBlock);
×
900
    return code;
×
901
  }
902

903
  char *pStart = pRsp->data;
74,207✔
904
  *(int32_t *)pStart = htonl(numOfCols);
74,207✔
905
  pStart += sizeof(int32_t);  // number of columns
74,207✔
906

907
  for (int32_t i = 0; i < numOfCols; ++i) {
519,449✔
908
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
445,242✔
909
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
445,242✔
910

911
    pSchema->bytes = htonl(pColInfo->info.bytes);
445,242✔
912
    pSchema->colId = htons(pColInfo->info.colId);
445,242✔
913
    pSchema->type = pColInfo->info.type;
445,242✔
914

915
    pStart += sizeof(SSysTableSchema);
445,242✔
916
  }
917

918
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
74,207✔
919
  if (len < 0) {
74,207✔
920
    dError("failed to retrieve data since %s", tstrerror(code));
×
921
    blockDataDestroy(pBlock);
×
922
    rpcFreeCont(pRsp);
×
923
    return terrno;
×
924
  }
925

926
  pRsp->numOfRows = htonl(pBlock->info.rows);
74,207✔
927
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
74,207✔
928
  pRsp->completed = 1;
74,207✔
929
  pMsg->info.rsp = pRsp;
74,207✔
930
  pMsg->info.rspLen = size;
74,207✔
931
  dDebug("dnode variables retrieve completed");
74,207✔
932

933
  blockDataDestroy(pBlock);
74,207✔
934
  return TSDB_CODE_SUCCESS;
74,207✔
935
}
936

937
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,526,990✔
938
  SMStreamHbRspMsg rsp = {0};
14,526,990✔
939
  int32_t          code = 0;
14,526,990✔
940
  SDecoder         decoder;
14,491,598✔
941
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
14,526,990✔
942
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
14,526,990✔
943
  int64_t          currTs = taosGetTimestampMs();
14,526,990✔
944

945
  if (pMsg->code) {
14,526,990✔
946
    return streamHbHandleRspErr(pMsg->code, currTs);
245,274✔
947
  }
948

949
  tDecoderInit(&decoder, (uint8_t*)msg, len);
14,281,716✔
950
  code = tDecodeStreamHbRsp(&decoder, &rsp);
14,281,716✔
951
  if (code < 0) {
14,281,716✔
952
    code = TSDB_CODE_INVALID_MSG;
×
953
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
954
    tDecoderClear(&decoder);
×
955
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
956
    return streamHbHandleRspErr(code, currTs);
×
957
  }
958

959
  tDecoderClear(&decoder);
14,281,716✔
960

961
  return streamHbProcessRspMsg(&rsp);
14,281,716✔
962
}
963

964

965
SArray *dmGetMsgHandles() {
699,573✔
966
  int32_t code = -1;
699,573✔
967
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
699,573✔
968
  if (pArray == NULL) {
699,573✔
969
    return NULL;
×
970
  }
971

972
  // Requests handled by DNODE
973
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
974
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
975
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
976
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
977
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
978
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
979
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
980
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
981
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
982
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
983
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
984
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
985
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
986
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
987
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
988

989
  // Requests handled by MNODE
990
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
991
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
992
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
699,573✔
993

994
  code = 0;
699,573✔
995

996
_OVER:
699,573✔
997
  if (code != 0) {
699,573✔
998
    taosArrayDestroy(pArray);
×
999
    return NULL;
×
1000
  } else {
1001
    return pArray;
699,573✔
1002
  }
1003
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc