• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4687

25 Aug 2025 07:22AM UTC coverage: 57.894% (-2.2%) from 60.092%
#4687

push

travis-ci

web-flow
fix: add taosBenchmark windows support params (#32708)

132643 of 292257 branches covered (45.39%)

Branch coverage included in aggregate %.

201266 of 284501 relevant lines covered (70.74%)

4743408.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.73
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
#include "monitor.h"
20
#include "systable.h"
21
#include "tanalytics.h"
22
#include "tchecksum.h"
23
#include "tutil.h"
24
#include "stream.h"
25

26
extern SConfig *tsCfg;
27

28
SMonVloadInfo tsVinfo = {0};
29
SMnodeLoad    tsMLoad = {0};
30
SDnodeData    tsDnodeData = {0};
31

32
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
9,420✔
33
  int32_t code = 0;
9,420✔
34
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
9,420✔
35
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
1,905!
36
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
1,905✔
37
    pMgmt->pData->dnodeId = pCfg->dnodeId;
1,905✔
38
    pMgmt->pData->clusterId = pCfg->clusterId;
1,905✔
39
    monSetDnodeId(pCfg->dnodeId);
1,905✔
40
    auditSetDnodeId(pCfg->dnodeId);
1,905✔
41
    code = dmWriteEps(pMgmt->pData);
1,905✔
42
    if (code != 0) {
1,905!
43
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
×
44
            tstrerror(code));
45
    }
46
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
1,905✔
47
  }
48
}
9,420✔
49

50
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
9,776✔
51
  int32_t code = 0;
9,776✔
52
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
9,776✔
53
  if (pMgmt->pData->ipWhiteVer == ver) {
9,776✔
54
    if (ver == 0) {
9,769✔
55
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
9,757✔
56
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
9,757!
57
        dError("failed to disable ip white list on dnode");
×
58
      }
59
    }
60
    return;
9,769✔
61
  }
62
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
7✔
63

64
  SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
7✔
65
  int32_t             contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
7✔
66
  if (contLen < 0) {
7!
67
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
68
    return;
×
69
  }
70
  void *pHead = rpcMallocCont(contLen);
7✔
71
  contLen = tSerializeRetrieveIpWhite(pHead, contLen, &req);
7✔
72
  if (contLen < 0) {
7!
73
    rpcFreeCont(pHead);
×
74
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
75
    return;
×
76
  }
77

78
  SRpcMsg rpcMsg = {.pCont = pHead,
7✔
79
                    .contLen = contLen,
80
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITE_DUAL,
81
                    .info.ahandle = 0,
82
                    .info.notFreeAhandle = 1,
83
                    .info.refId = 0,
84
                    .info.noResp = 0,
85
                    .info.handle = 0};
86
  SEpSet  epset = {0};
7✔
87

88
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
7✔
89

90
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
7✔
91
  if (code != 0) {
7!
92
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
93
  }
94
}
95

96
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
9,776✔
97
  int32_t code = 0;
9,776✔
98
  int64_t oldVer = taosAnalyGetVersion();
9,776✔
99
  if (oldVer == newVer) return;
9,776!
100
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
101

102
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
103
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
104
  if (contLen < 0) {
×
105
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
106
    return;
×
107
  }
108

109
  void *pHead = rpcMallocCont(contLen);
×
110
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
111
  if (contLen < 0) {
×
112
    rpcFreeCont(pHead);
×
113
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
114
    return;
×
115
  }
116

117
  SRpcMsg rpcMsg = {
×
118
      .pCont = pHead,
119
      .contLen = contLen,
120
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
121
      .info.ahandle = 0,
122
      .info.refId = 0,
123
      .info.noResp = 0,
124
      .info.handle = 0,
125
  };
126
  SEpSet epset = {0};
×
127

128
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
129

130
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
131
  if (code != 0) {
×
132
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
133
  }
134
}
135

136
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
93,817✔
137
  const STraceId *trace = &pRsp->info.traceId;
93,817✔
138
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
93,817!
139

140
  if (pRsp->code != 0) {
93,817!
141
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
142
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
143
             pMgmt->statusSeq);
144
      pMgmt->pData->dropped = 1;
×
145
      if (dmWriteEps(pMgmt->pData) != 0) {
×
146
        dError("failed to write dnode file");
×
147
      }
148
      dInfo("dnode will exit since it is in the dropped state");
×
149
      (void)raise(SIGINT);
×
150
    }
151
  } else {
152
    SStatusRsp statusRsp = {0};
93,817✔
153
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
103,593!
154
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
9,776✔
155
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
9,776✔
156
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
9,420!
157
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
158
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
9,420✔
159
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
9,420✔
160
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
9,420✔
161
      }
162
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
9,776✔
163
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
9,776✔
164
    }
165
    tFreeSStatusRsp(&statusRsp);
93,817✔
166
  }
167
  rpcFreeCont(pRsp->pCont);
93,817✔
168
}
93,817✔
169

170
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
94,068✔
171
  int32_t    code = 0;
94,068✔
172
  SStatusReq req = {0};
94,068✔
173
  req.timestamp = taosGetTimestampMs();
94,068✔
174
  pMgmt->statusSeq++;
94,068✔
175

176
  dDebug("send status req to mnode, statusSeq:%d, begin to mgnt statusInfolock", pMgmt->statusSeq);
94,068✔
177
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
94,068!
178
    dError("failed to lock status info lock");
×
179
    return;
251✔
180
  }
181

182
  dDebug("send status req to mnode, statusSeq:%d, begin to get dnode info", pMgmt->statusSeq);
94,068✔
183
  req.sver = tsVersion;
94,068✔
184
  req.dnodeVer = tsDnodeData.dnodeVer;
94,068✔
185
  req.dnodeId = tsDnodeData.dnodeId;
94,068✔
186
  req.clusterId = tsDnodeData.clusterId;
94,068✔
187
  if (req.clusterId == 0) req.dnodeId = 0;
94,068✔
188
  req.rebootTime = tsDnodeData.rebootTime;
94,068✔
189
  req.updateTime = tsDnodeData.updateTime;
94,068✔
190
  req.numOfCores = tsNumOfCores;
94,068✔
191
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
94,068✔
192
  req.numOfDiskCfg = tsDiskCfgNum;
94,068✔
193
  req.memTotal = tsTotalMemoryKB * 1024;
94,068✔
194
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
94,068✔
195
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
94,068✔
196
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
94,068✔
197

198
  req.clusterCfg.statusInterval = tsStatusInterval;
94,068✔
199
  req.clusterCfg.checkTime = 0;
94,068✔
200
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
94,068✔
201
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
94,068✔
202
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
94,068✔
203
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
94,068✔
204
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
94,068✔
205
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
94,068✔
206
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
94,068✔
207
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
94,068✔
208
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
94,068✔
209
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
94,068✔
210
  char timestr[32] = "1970-01-01 00:00:00.00";
94,068✔
211
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
94,068!
212
      0) {
213
    dError("failed to parse time since %s", tstrerror(code));
×
214
  }
215
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
94,068✔
216
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
94,068✔
217
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
94,068✔
218

219
  dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
94,068✔
220

221
  req.pVloads = tsVinfo.pVloads;
94,068✔
222
  tsVinfo.pVloads = NULL;
94,068✔
223

224
  dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
94,068✔
225
  req.mload = tsMLoad;
94,068✔
226

227
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
94,068!
228
    dError("failed to unlock status info lock");
×
229
    return;
×
230
  }
231

232
  dDebug("send status req to mnode, statusSeq:%d, begin to get qnode loads", pMgmt->statusSeq);
94,068✔
233
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
94,068✔
234

235
  req.statusSeq = pMgmt->statusSeq;
94,068✔
236
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
94,068✔
237
  req.analVer = taosAnalyGetVersion();
94,068✔
238

239
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
94,068✔
240
  if (contLen < 0) {
94,068!
241
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
242
    return;
×
243
  }
244

245
  void *pHead = rpcMallocCont(contLen);
94,068✔
246
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
94,068✔
247
  if (contLen < 0) {
94,068!
248
    rpcFreeCont(pHead);
×
249
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
250
    return;
×
251
  }
252
  tFreeSStatusReq(&req);
94,068✔
253

254
  SRpcMsg rpcMsg = {.pCont = pHead,
94,068✔
255
                    .contLen = contLen,
256
                    .msgType = TDMT_MND_STATUS,
257
                    .info.ahandle = 0,
258
                    .info.notFreeAhandle = 1,
259
                    .info.refId = 0,
260
                    .info.noResp = 0,
261
                    .info.handle = 0};
262
  SRpcMsg rpcRsp = {0};
94,068✔
263

264
  dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
94,068✔
265

266
  SEpSet epSet = {0};
94,068✔
267
  int8_t epUpdated = 0;
94,068✔
268
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
94,068✔
269

270
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
94,068✔
271
  code =
272
      rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
94,068✔
273
  if (code != 0) {
94,068✔
274
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusInterval * 5 * 1000, tstrerror(code));
251!
275
    return;
251✔
276
  }
277

278
  if (rpcRsp.code != 0) {
93,817!
279
    dmRotateMnodeEpSet(pMgmt->pData);
×
280
    char tbuf[512];
281
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
×
282
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
×
283
          tbuf, epSet.inUse);
284
  } else {
285
    if (epUpdated == 1) {
93,817✔
286
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
1,132✔
287
    }
288
  }
289
  dmProcessStatusRsp(pMgmt, &rpcRsp);
93,817✔
290
}
291

292
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
2,363✔
293
  const STraceId *trace = &pRsp->info.traceId;
2,363✔
294
  int32_t         code = 0;
2,363✔
295
  SConfigRsp      configRsp = {0};
2,363✔
296
  bool            needStop = false;
2,363✔
297

298
  if (pRsp->code != 0) {
2,363!
299
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
300
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
301
      pMgmt->pData->dropped = 1;
×
302
      if (dmWriteEps(pMgmt->pData) != 0) {
×
303
        dError("failed to write dnode file");
×
304
      }
305
      dInfo("dnode will exit since it is in the dropped state");
×
306
      (void)raise(SIGINT);
×
307
    }
308
  } else {
309
    bool needUpdate = false;
2,363✔
310
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
4,726!
311
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
2,363✔
312
      // Try to use cfg from mnode sdb.
313
      if (!configRsp.isVersionVerified) {
2,363✔
314
        uInfo("config version not verified, update config");
1,913!
315
        needUpdate = true;
1,913✔
316
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
1,913✔
317
        if (code != TSDB_CODE_SUCCESS) {
1,913✔
318
          dError("failed to persist global config since %s", tstrerror(code));
2!
319
          goto _exit;
2✔
320
        }
321
      }
322
    }
323
    if (needUpdate) {
2,361✔
324
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
1,911✔
325
      if (code != TSDB_CODE_SUCCESS) {
1,911!
326
        dError("failed to update config since %s", tstrerror(code));
×
327
        goto _exit;
×
328
      }
329
      code = setAllConfigs(tsCfg);
1,911✔
330
      if (code != TSDB_CODE_SUCCESS) {
1,911✔
331
        dError("failed to set all configs since %s", tstrerror(code));
8!
332
        goto _exit;
8✔
333
      }
334
    }
335
    code = taosPersistLocalConfig(pMgmt->path);
2,353✔
336
    if (code != TSDB_CODE_SUCCESS) {
2,353!
337
      dError("failed to persist local config since %s", tstrerror(code));
×
338
    }
339
    tsConfigInited = 1;
2,353✔
340
  }
341
_exit:
2,363✔
342
  tFreeSConfigRsp(&configRsp);
2,363✔
343
  rpcFreeCont(pRsp->pCont);
2,363✔
344
  if (needStop) {
2,363!
345
    dmStop();
×
346
  }
347
}
2,363✔
348

349
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
2,371✔
350
  int32_t    code = 0;
2,371✔
351
  SConfigReq req = {0};
2,371✔
352

353
  req.cver = tsdmConfigVersion;
2,371✔
354
  req.forceReadConfig = tsForceReadConfig;
2,371✔
355
  req.array = taosGetGlobalCfg(tsCfg);
2,371✔
356
  dDebug("send config req to mnode, configVersion:%d", req.cver);
2,371✔
357

358
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
2,371✔
359
  if (contLen < 0) {
2,371!
360
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
361
    return;
8✔
362
  }
363

364
  void *pHead = rpcMallocCont(contLen);
2,371✔
365
  if (pHead == NULL) {
2,371!
366
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
367
    return;
×
368
  }
369
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
2,371✔
370
  if (contLen < 0) {
2,371!
371
    rpcFreeCont(pHead);
×
372
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
373
    return;
×
374
  }
375

376
  SRpcMsg rpcMsg = {.pCont = pHead,
2,371✔
377
                    .contLen = contLen,
378
                    .msgType = TDMT_MND_CONFIG,
379
                    .info.ahandle = 0,
380
                    .info.notFreeAhandle = 1,
381
                    .info.refId = 0,
382
                    .info.noResp = 0,
383
                    .info.handle = 0};
384
  SRpcMsg rpcRsp = {0};
2,371✔
385

386
  SEpSet epSet = {0};
2,371✔
387
  int8_t epUpdated = 0;
2,371✔
388
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
2,371✔
389

390
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
2,371✔
391
  code =
392
      rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
2,371✔
393
  if (code != 0) {
2,371✔
394
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusInterval * 5 * 1000, tstrerror(code));
8!
395
    return;
8✔
396
  }
397
  if (rpcRsp.code != 0) {
2,363!
398
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
399
    return;
×
400
  }
401
  dmProcessConfigRsp(pMgmt, &rpcRsp);
2,363✔
402
}
403

404
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
95,817✔
405
  dDebug("begin to get dnode info");
95,817✔
406
  SDnodeData dnodeData = {0};
95,817✔
407
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
95,817✔
408
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
95,817✔
409
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
95,817✔
410
  dnodeData.clusterId = pMgmt->pData->clusterId;
95,817✔
411
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
95,817✔
412
  dnodeData.updateTime = pMgmt->pData->updateTime;
95,817✔
413
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
95,817✔
414
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
95,817✔
415

416
  dDebug("begin to get vnode loads");
95,817✔
417
  SMonVloadInfo vinfo = {0};
95,817✔
418
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
95,817✔
419

420
  dDebug("begin to get mnode loads");
95,817✔
421
  SMonMloadInfo minfo = {0};
95,817✔
422
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
95,817✔
423

424
  dDebug("begin to lock status info");
95,817✔
425
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
95,817!
426
    dError("failed to lock status info lock");
×
427
    return;
×
428
  }
429
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
95,817✔
430
  tsDnodeData.dnodeId = dnodeData.dnodeId;
95,817✔
431
  tsDnodeData.clusterId = dnodeData.clusterId;
95,817✔
432
  tsDnodeData.rebootTime = dnodeData.rebootTime;
95,817✔
433
  tsDnodeData.updateTime = dnodeData.updateTime;
95,817✔
434
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
95,817✔
435

436
  if (tsVinfo.pVloads == NULL) {
95,817✔
437
    tsVinfo.pVloads = vinfo.pVloads;
92,036✔
438
    vinfo.pVloads = NULL;
92,036✔
439
  } else {
440
    taosArrayDestroy(vinfo.pVloads);
3,781✔
441
    vinfo.pVloads = NULL;
3,781✔
442
  }
443

444
  tsMLoad = minfo.load;
95,817✔
445

446
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
95,817!
447
    dError("failed to unlock status info lock");
×
448
    return;
×
449
  }
450
}
451

452
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
453
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
454
  if (contLen < 0) {
×
455
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
456
    return;
×
457
  }
458
  void *pHead = rpcMallocCont(contLen);
×
459
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
460
  if (contLen < 0) {
×
461
    rpcFreeCont(pHead);
×
462
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
463
    return;
×
464
  }
465

466
  SRpcMsg rpcMsg = {.pCont = pHead,
×
467
                    .contLen = contLen,
468
                    .msgType = TDMT_MND_NOTIFY,
469
                    .info.ahandle = 0,
470
                    .info.notFreeAhandle = 1,
471
                    .info.refId = 0,
472
                    .info.noResp = 1,
473
                    .info.handle = 0};
474

475
  SEpSet epSet = {0};
×
476
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
477
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
478
    dError("failed to send notify req");
×
479
  }
480
}
481

482
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
483
  dError("auth rsp is received, but not supported yet");
×
484
  return 0;
×
485
}
486

487
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
488
  dError("grant rsp is received, but not supported yet");
×
489
  return 0;
×
490
}
491

492
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
178✔
493
  int32_t       code = 0;
178✔
494
  SDCfgDnodeReq cfgReq = {0};
178✔
495
  SConfig      *pCfg = taosGetCfg();
178✔
496
  SConfigItem  *pItem = NULL;
178✔
497

498
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
178!
499
    return TSDB_CODE_INVALID_MSG;
×
500
  }
501
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
178✔
502
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
5✔
503
  }
504

505
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
173!
506

507
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
173✔
508
  if (code != 0) {
173✔
509
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
2!
510
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
2!
511
      return TSDB_CODE_SUCCESS;
2✔
512
    } else {
513
      return code;
×
514
    }
515
  }
516
  if (pItem == NULL) {
171!
517
    return TSDB_CODE_CFG_NOT_FOUND;
×
518
  }
519
  if (!isConifgItemLazyMode(pItem)) {
171✔
520
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
170!
521
  }
522

523
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
171✔
524
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
44✔
525
    if (code != TSDB_CODE_SUCCESS) {
44!
526
      dError("failed to persist global config since %s", tstrerror(code));
×
527
    }
528
  } else {
529
    code = taosPersistLocalConfig(pMgmt->path);
127✔
530
    if (code != TSDB_CODE_SUCCESS) {
127!
531
      dError("failed to persist local config since %s", tstrerror(code));
×
532
    }
533
  }
534
  if (cfgReq.version > 0) {
171✔
535
    tsdmConfigVersion = cfgReq.version;
94✔
536
  }
537
  return code;
171✔
538
}
539

540
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5✔
541
#ifdef TD_ENTERPRISE
542
  int32_t       code = 0;
5✔
543
  SDCfgDnodeReq cfgReq = {0};
5✔
544
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
5!
545
    code = TSDB_CODE_INVALID_MSG;
×
546
    goto _exit;
×
547
  }
548

549
  code = dmUpdateEncryptKey(cfgReq.value, true);
5✔
550
  if (code == 0) {
5!
551
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
5✔
552
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
5✔
553
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
5✔
554
  }
555

556
_exit:
×
557
  pMsg->code = code;
5✔
558
  pMsg->info.rsp = NULL;
5✔
559
  pMsg->info.rspLen = 0;
5✔
560
  return code;
5✔
561
#else
562
  return 0;
563
#endif
564
}
565

566
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
4✔
567
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
4✔
568
  pStatus->details[0] = 0;
4✔
569

570
  SMonMloadInfo minfo = {0};
4✔
571
  (*pMgmt->getMnodeLoadsFp)(&minfo);
4✔
572
  if (minfo.isMnode &&
4!
573
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
4!
574
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
575
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
576
    return;
×
577
  }
578

579
  SMonVloadInfo vinfo = {0};
4✔
580
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
4✔
581
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
12✔
582
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
8✔
583
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
8!
584
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
585
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
586
               syncStr(pLoad->syncState));
×
587
      break;
×
588
    }
589
  }
590

591
  taosArrayDestroy(vinfo.pVloads);
4✔
592
}
593

594
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4✔
595
  int32_t code = 0;
4✔
596
  dDebug("server run status req is received");
4!
597
  SServerStatusRsp statusRsp = {0};
4✔
598
  dmGetServerRunStatus(pMgmt, &statusRsp);
4✔
599

600
  pMsg->info.rsp = NULL;
4✔
601
  pMsg->info.rspLen = 0;
4✔
602

603
  SRpcMsg rspMsg = {.info = pMsg->info};
4✔
604
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
4✔
605
  if (rspLen < 0) {
4!
606
    return TSDB_CODE_OUT_OF_MEMORY;
×
607
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
608
    // return rspMsg.code;
609
  }
610

611
  void *pRsp = rpcMallocCont(rspLen);
4✔
612
  if (pRsp == NULL) {
4!
613
    return terrno;
×
614
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
615
    // return rspMsg.code;
616
  }
617

618
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
4✔
619
  if (rspLen < 0) {
4!
620
    return TSDB_CODE_INVALID_MSG;
×
621
  }
622

623
  pMsg->info.rsp = pRsp;
4✔
624
  pMsg->info.rspLen = rspLen;
4✔
625
  return 0;
4✔
626
}
627

628
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
280✔
629
  int32_t code = 0;
280✔
630

631
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
280!
632
  if (pBlock == NULL) {
280!
633
    return terrno;
×
634
  }
635

636
  size_t size = 0;
280✔
637

638
  const SSysTableMeta *pMeta = NULL;
280✔
639
  getInfosDbMeta(&pMeta, &size);
280✔
640

641
  int32_t index = 0;
280✔
642
  for (int32_t i = 0; i < size; ++i) {
5,600!
643
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
5,600✔
644
      index = i;
280✔
645
      break;
280✔
646
    }
647
  }
648

649
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
280✔
650
  if (pBlock->pDataBlock == NULL) {
280!
651
    code = terrno;
×
652
    goto _exit;
×
653
  }
654

655
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
1,960✔
656
    SColumnInfoData colInfoData = {0};
1,680✔
657
    colInfoData.info.colId = i + 1;
1,680✔
658
    colInfoData.info.type = pMeta[index].schema[i].type;
1,680✔
659
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
1,680✔
660
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
3,360!
661
      code = terrno;
×
662
      goto _exit;
×
663
    }
664
  }
665

666
  pBlock->info.hasVarCol = true;
280✔
667
_exit:
280✔
668
  if (code != 0) {
280!
669
    blockDataDestroy(pBlock);
×
670
  } else {
671
    *ppBlock = pBlock;
280✔
672
  }
673
  return code;
280✔
674
}
675

676
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
280✔
677
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
280✔
678
  if (code != 0) {
280!
679
    return code;
×
680
  }
681

682
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
280✔
683
  if (pColInfo == NULL) {
280!
684
    return TSDB_CODE_OUT_OF_RANGE;
×
685
  }
686

687
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false);
280✔
688
}
689

690
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
280✔
691
  int32_t           size = 0;
280✔
692
  int32_t           rowsRead = 0;
280✔
693
  int32_t           code = 0;
280✔
694
  SRetrieveTableReq retrieveReq = {0};
280✔
695
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
280!
696
    return TSDB_CODE_INVALID_MSG;
×
697
  }
698
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
280!
699
#if 0
700
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
701
    code = TSDB_CODE_MND_NO_RIGHTS;
702
    return code;
703
  }
704
#endif
705
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
280!
706
    return TSDB_CODE_INVALID_MSG;
×
707
  }
708

709
  SSDataBlock *pBlock = NULL;
280✔
710
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
280!
711
    return code;
×
712
  }
713

714
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
280✔
715
  if (code != 0) {
280!
716
    blockDataDestroy(pBlock);
×
717
    return code;
×
718
  }
719

720
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
280✔
721
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
280✔
722
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
280✔
723

724
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
280✔
725
  if (pRsp == NULL) {
280!
726
    code = terrno;
×
727
    dError("failed to retrieve data since %s", tstrerror(code));
×
728
    blockDataDestroy(pBlock);
×
729
    return code;
×
730
  }
731

732
  char *pStart = pRsp->data;
280✔
733
  *(int32_t *)pStart = htonl(numOfCols);
280✔
734
  pStart += sizeof(int32_t);  // number of columns
280✔
735

736
  for (int32_t i = 0; i < numOfCols; ++i) {
1,960✔
737
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
1,680✔
738
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
1,680✔
739

740
    pSchema->bytes = htonl(pColInfo->info.bytes);
1,680✔
741
    pSchema->colId = htons(pColInfo->info.colId);
1,680✔
742
    pSchema->type = pColInfo->info.type;
1,680✔
743

744
    pStart += sizeof(SSysTableSchema);
1,680✔
745
  }
746

747
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
280✔
748
  if (len < 0) {
280!
749
    dError("failed to retrieve data since %s", tstrerror(code));
×
750
    blockDataDestroy(pBlock);
×
751
    rpcFreeCont(pRsp);
×
752
    return terrno;
×
753
  }
754

755
  pRsp->numOfRows = htonl(pBlock->info.rows);
280✔
756
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
280✔
757
  pRsp->completed = 1;
280✔
758
  pMsg->info.rsp = pRsp;
280✔
759
  pMsg->info.rspLen = size;
280✔
760
  dDebug("dnode variables retrieve completed");
280!
761

762
  blockDataDestroy(pBlock);
280✔
763
  return TSDB_CODE_SUCCESS;
280✔
764
}
765

766
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
31,777✔
767
  SMStreamHbRspMsg rsp = {0};
31,777✔
768
  int32_t          code = 0;
31,777✔
769
  SDecoder         decoder;
770
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
31,777✔
771
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
31,777✔
772
  int64_t          currTs = taosGetTimestampMs();
31,777✔
773

774
  if (pMsg->code) {
31,777✔
775
    return streamHbHandleRspErr(pMsg->code, currTs);
375✔
776
  }
777

778
  tDecoderInit(&decoder, (uint8_t*)msg, len);
31,402✔
779
  code = tDecodeStreamHbRsp(&decoder, &rsp);
31,402✔
780
  if (code < 0) {
31,402!
781
    code = TSDB_CODE_INVALID_MSG;
×
782
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
783
    tDecoderClear(&decoder);
×
784
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
785
    return streamHbHandleRspErr(code, currTs);
×
786
  }
787

788
  tDecoderClear(&decoder);
31,402✔
789

790
  return streamHbProcessRspMsg(&rsp);
31,402✔
791
}
792

793

794
SArray *dmGetMsgHandles() {
2,384✔
795
  int32_t code = -1;
2,384✔
796
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
2,384✔
797
  if (pArray == NULL) {
2,384!
798
    return NULL;
×
799
  }
800

801
  // Requests handled by DNODE
802
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
803
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
804
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
805
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
806
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
807
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
808
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
809
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
810
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
811
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
812
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
813
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
814
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
815
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
816
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
2,384!
817

818
  // Requests handled by MNODE
819
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
820
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
821
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,384!
822

823
  code = 0;
2,384✔
824

825
_OVER:
2,384✔
826
  if (code != 0) {
2,384!
827
    taosArrayDestroy(pArray);
×
828
    return NULL;
×
829
  } else {
830
    return pArray;
2,384✔
831
  }
832
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc