• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4804

16 Oct 2025 10:33AM UTC coverage: 61.259% (+0.1%) from 61.147%
#4804

push

travis-ci

happyguoxy
Merge branch 'cover/3.0' of github.com:taosdata/TDengine into cover/3.0

156021 of 324369 branches covered (48.1%)

Branch coverage included in aggregate %.

79 of 100 new or added lines in 19 files covered. (79.0%)

3318 existing lines in 125 files now uncovered.

207798 of 269534 relevant lines covered (77.1%)

168909799.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.58
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
#include "monitor.h"
20
#include "systable.h"
21
#include "tanalytics.h"
22
#include "tchecksum.h"
23
#include "tutil.h"
24
#include "stream.h"
25

26
extern SConfig *tsCfg;
27

28
SMonVloadInfo tsVinfo = {0};
29
SMnodeLoad    tsMLoad = {0};
30
SDnodeData    tsDnodeData = {0};
31

32
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
4,340,630✔
33
  int32_t code = 0;
4,340,630✔
34
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
4,340,630✔
35
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
925,416!
36
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
925,416✔
37
    pMgmt->pData->dnodeId = pCfg->dnodeId;
925,416✔
38
    pMgmt->pData->clusterId = pCfg->clusterId;
925,416✔
39
    monSetDnodeId(pCfg->dnodeId);
925,416✔
40
    auditSetDnodeId(pCfg->dnodeId);
925,416✔
41
    code = dmWriteEps(pMgmt->pData);
925,416✔
42
    if (code != 0) {
925,416✔
43
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
8,185!
44
            tstrerror(code));
45
    }
46
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
925,416✔
47
  }
48
}
4,340,630✔
49

50
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
4,597,731✔
51
  int32_t code = 0;
4,597,731✔
52
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
4,597,731✔
53
  if (pMgmt->pData->ipWhiteVer == ver) {
4,597,731✔
54
    if (ver == 0) {
4,593,144✔
55
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
4,587,529✔
56
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
4,587,529!
57
        dError("failed to disable ip white list on dnode");
×
58
      }
59
    }
60
    return;
4,593,144✔
61
  }
62
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
4,587✔
63

64
  SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
4,587✔
65
  int32_t             contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
4,587✔
66
  if (contLen < 0) {
4,587!
67
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
68
    return;
×
69
  }
70
  void *pHead = rpcMallocCont(contLen);
4,587✔
71
  contLen = tSerializeRetrieveIpWhite(pHead, contLen, &req);
4,587✔
72
  if (contLen < 0) {
4,587!
73
    rpcFreeCont(pHead);
×
74
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
75
    return;
×
76
  }
77

78
  SRpcMsg rpcMsg = {.pCont = pHead,
4,587✔
79
                    .contLen = contLen,
80
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITE_DUAL,
81
                    .info.ahandle = 0,
82
                    .info.notFreeAhandle = 1,
83
                    .info.refId = 0,
84
                    .info.noResp = 0,
85
                    .info.handle = 0};
86
  SEpSet  epset = {0};
4,587✔
87

88
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
4,587✔
89

90
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
4,587✔
91
  if (code != 0) {
4,587!
92
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
93
  }
94
}
95

96
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
4,597,731✔
97
  int32_t code = 0;
4,597,731✔
98
  int64_t oldVer = taosAnalyGetVersion();
4,597,731✔
99
  if (oldVer == newVer) return;
4,597,731!
100
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
101

102
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
103
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
104
  if (contLen < 0) {
×
105
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
106
    return;
×
107
  }
108

109
  void *pHead = rpcMallocCont(contLen);
×
110
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
111
  if (contLen < 0) {
×
112
    rpcFreeCont(pHead);
×
113
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
114
    return;
×
115
  }
116

117
  SRpcMsg rpcMsg = {
×
118
      .pCont = pHead,
119
      .contLen = contLen,
120
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
121
      .info.ahandle = 0,
122
      .info.refId = 0,
123
      .info.noResp = 0,
124
      .info.handle = 0,
125
  };
126
  SEpSet epset = {0};
×
127

128
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
129

130
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
131
  if (code != 0) {
×
132
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
133
  }
134
}
135

136
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
65,632,629✔
137
  const STraceId *trace = &pRsp->info.traceId;
65,632,629✔
138
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
65,632,629!
139

140
  if (pRsp->code != 0) {
65,632,629!
141
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
142
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
143
             pMgmt->statusSeq);
144
      pMgmt->pData->dropped = 1;
×
145
      if (dmWriteEps(pMgmt->pData) != 0) {
×
146
        dError("failed to write dnode file");
×
147
      }
148
      dInfo("dnode will exit since it is in the dropped state");
×
149
      (void)raise(SIGINT);
×
150
    }
151
  } else {
152
    SStatusRsp statusRsp = {0};
65,632,629✔
153
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
70,230,360!
154
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
4,597,731✔
155
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
4,597,731✔
156
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
4,340,630!
157
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
158
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
4,340,630✔
159
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
4,340,630✔
160
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
4,340,630✔
161
      }
162
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
4,597,731✔
163
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
4,597,731✔
164
    }
165
    tFreeSStatusRsp(&statusRsp);
65,632,629✔
166
  }
167
  rpcFreeCont(pRsp->pCont);
65,632,629✔
168
}
65,632,629✔
169

170
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
65,796,978✔
171
  int32_t    code = 0;
65,796,978✔
172
  SStatusReq req = {0};
65,796,978✔
173
  req.timestamp = taosGetTimestampMs();
65,796,978✔
174
  pMgmt->statusSeq++;
65,796,978✔
175

176
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
65,796,978✔
177
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
65,796,978!
178
    dError("failed to lock status info lock");
×
UNCOV
179
    return;
×
180
  }
181

182
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
65,796,978✔
183
  req.sver = tsVersion;
65,796,978✔
184
  req.dnodeVer = tsDnodeData.dnodeVer;
65,796,978✔
185
  req.dnodeId = tsDnodeData.dnodeId;
65,796,978✔
186
  req.clusterId = tsDnodeData.clusterId;
65,796,978✔
187
  if (req.clusterId == 0) req.dnodeId = 0;
65,796,978✔
188
  req.rebootTime = tsDnodeData.rebootTime;
65,796,978✔
189
  req.updateTime = tsDnodeData.updateTime;
65,796,978✔
190
  req.numOfCores = tsNumOfCores;
65,796,978✔
191
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
65,796,978✔
192
  req.numOfDiskCfg = tsDiskCfgNum;
65,796,978✔
193
  req.memTotal = tsTotalMemoryKB * 1024;
65,796,978✔
194
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
65,796,978✔
195
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
65,796,978!
196
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
65,796,978✔
197

198
  req.clusterCfg.statusInterval = tsStatusInterval;
65,796,978✔
199
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
65,796,978✔
200
  req.clusterCfg.checkTime = 0;
65,796,978✔
201
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
65,796,978!
202
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
65,796,978!
203
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
65,796,978✔
204
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
65,796,978✔
205
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
65,796,978!
206
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
65,796,978✔
207
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
65,796,978✔
208
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
65,796,978✔
209
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
65,796,978✔
210
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
65,796,978!
211
  char timestr[32] = "1970-01-01 00:00:00.00";
65,796,978✔
212
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
65,796,978!
213
      0) {
214
    dError("failed to parse time since %s", tstrerror(code));
×
215
  }
216
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
65,796,978✔
217
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
65,796,978✔
218
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
65,796,978✔
219

220
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
65,796,978✔
221

222
  req.pVloads = tsVinfo.pVloads;
65,796,978✔
223
  tsVinfo.pVloads = NULL;
65,796,978✔
224

225
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
65,796,978✔
226
  req.mload = tsMLoad;
65,796,978✔
227

228
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
65,796,978!
229
    dError("failed to unlock status info lock");
×
230
    return;
×
231
  }
232

233
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
65,796,978✔
234
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
65,796,978✔
235

236
  req.statusSeq = pMgmt->statusSeq;
65,796,978✔
237
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
65,796,978✔
238
  req.analVer = taosAnalyGetVersion();
65,796,978✔
239

240
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
65,796,978✔
241
  if (contLen < 0) {
65,796,978!
242
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
243
    return;
×
244
  }
245

246
  void *pHead = rpcMallocCont(contLen);
65,796,978✔
247
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
65,796,978✔
248
  if (contLen < 0) {
65,796,978!
249
    rpcFreeCont(pHead);
×
250
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
251
    return;
×
252
  }
253
  tFreeSStatusReq(&req);
65,796,978✔
254

255
  SRpcMsg rpcMsg = {.pCont = pHead,
65,796,978✔
256
                    .contLen = contLen,
257
                    .msgType = TDMT_MND_STATUS,
258
                    .info.ahandle = 0,
259
                    .info.notFreeAhandle = 1,
260
                    .info.refId = 0,
261
                    .info.noResp = 0,
262
                    .info.handle = 0};
263
  SRpcMsg rpcRsp = {0};
65,796,978✔
264

265
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
65,796,978✔
266

267
  SEpSet epSet = {0};
65,796,978✔
268
  int8_t epUpdated = 0;
65,796,978✔
269
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
65,796,978✔
270

271
  if (dDebugFlag & DEBUG_TRACE) {
65,796,978✔
272
    char tbuf[512];
1,429,737✔
273
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
1,430,067✔
274
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
1,430,067!
275
  }
276
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
65,796,978✔
277
  if (code != 0) {
65,796,978✔
278
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
164,349!
279
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
164,349!
280
      dmRotateMnodeEpSet(pMgmt->pData);
164,349✔
281
      char tbuf[512];
164,349✔
282
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
164,349✔
283
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
164,349!
284
            tbuf, epSet.inUse);
285
    }
286
    return;
164,349✔
287
  }
288

289
  if (rpcRsp.code != 0) {
65,632,629!
UNCOV
290
    dmRotateMnodeEpSet(pMgmt->pData);
×
UNCOV
291
    char tbuf[512];
×
UNCOV
292
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
×
UNCOV
293
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
×
294
          tbuf, epSet.inUse);
295
  } else {
296
    if (epUpdated == 1) {
65,632,629✔
297
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
497,742✔
298
    }
299
  }
300
  dmProcessStatusRsp(pMgmt, &rpcRsp);
65,632,629✔
301
}
302

303
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
1,198,397✔
304
  const STraceId *trace = &pRsp->info.traceId;
1,198,397✔
305
  int32_t         code = 0;
1,198,397✔
306
  SConfigRsp      configRsp = {0};
1,198,397✔
307
  bool            needStop = false;
1,198,397✔
308

309
  if (pRsp->code != 0) {
1,198,397!
UNCOV
310
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
UNCOV
311
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
UNCOV
312
      pMgmt->pData->dropped = 1;
×
UNCOV
313
      if (dmWriteEps(pMgmt->pData) != 0) {
×
UNCOV
314
        dError("failed to write dnode file");
×
315
      }
UNCOV
316
      dInfo("dnode will exit since it is in the dropped state");
×
UNCOV
317
      (void)raise(SIGINT);
×
318
    }
319
  } else {
320
    bool needUpdate = false;
1,198,397✔
321
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
2,396,794!
322
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
1,198,397✔
323
      // Try to use cfg from mnode sdb.
324
      if (!configRsp.isVersionVerified) {
1,198,397✔
325
        uInfo("config version not verified, update config");
928,752!
326
        needUpdate = true;
928,752✔
327
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
928,752✔
328
        if (code != TSDB_CODE_SUCCESS) {
928,752✔
329
          dError("failed to persist global config since %s", tstrerror(code));
90!
330
          goto _exit;
90✔
331
        }
332
      }
333
    }
334
    if (needUpdate) {
1,198,307✔
335
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
928,662✔
336
      if (code != TSDB_CODE_SUCCESS) {
928,662!
UNCOV
337
        dError("failed to update config since %s", tstrerror(code));
×
UNCOV
338
        goto _exit;
×
339
      }
340
      code = setAllConfigs(tsCfg);
928,662✔
341
      if (code != TSDB_CODE_SUCCESS) {
928,662✔
342
        dError("failed to set all configs since %s", tstrerror(code));
240!
343
        goto _exit;
240✔
344
      }
345
    }
346
    code = taosPersistLocalConfig(pMgmt->path);
1,198,067✔
347
    if (code != TSDB_CODE_SUCCESS) {
1,198,067!
348
      dError("failed to persist local config since %s", tstrerror(code));
×
349
    }
350
    tsConfigInited = 1;
1,198,067✔
351
  }
352
_exit:
1,198,397✔
353
  tFreeSConfigRsp(&configRsp);
1,198,397✔
354
  rpcFreeCont(pRsp->pCont);
1,198,397✔
355
  if (needStop) {
1,198,397!
UNCOV
356
    dmStop();
×
357
  }
358
}
1,198,397✔
359

360
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
1,199,653✔
361
  int32_t    code = 0;
1,199,653✔
362
  SConfigReq req = {0};
1,199,653✔
363

364
  req.cver = tsdmConfigVersion;
1,199,653✔
365
  req.forceReadConfig = tsForceReadConfig;
1,199,653✔
366
  req.array = taosGetGlobalCfg(tsCfg);
1,199,653✔
367
  dDebug("send config req to mnode, configVersion:%d", req.cver);
1,199,653✔
368

369
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
1,199,653✔
370
  if (contLen < 0) {
1,199,653!
UNCOV
371
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
UNCOV
372
    return;
×
373
  }
374

375
  void *pHead = rpcMallocCont(contLen);
1,199,653✔
376
  if (pHead == NULL) {
1,199,653!
UNCOV
377
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
UNCOV
378
    return;
×
379
  }
380
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
1,199,653✔
381
  if (contLen < 0) {
1,199,653!
382
    rpcFreeCont(pHead);
×
UNCOV
383
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
UNCOV
384
    return;
×
385
  }
386

387
  SRpcMsg rpcMsg = {.pCont = pHead,
1,199,653✔
388
                    .contLen = contLen,
389
                    .msgType = TDMT_MND_CONFIG,
390
                    .info.ahandle = 0,
391
                    .info.notFreeAhandle = 1,
392
                    .info.refId = 0,
393
                    .info.noResp = 0,
394
                    .info.handle = 0};
395
  SRpcMsg rpcRsp = {0};
1,199,653✔
396

397
  SEpSet epSet = {0};
1,199,653✔
398
  int8_t epUpdated = 0;
1,199,653✔
399
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
1,199,653✔
400

401
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
1,199,653✔
402
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
1,199,653✔
403
  if (code != 0) {
1,199,653✔
404
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
1,256!
405
    return;
1,256✔
406
  }
407
  if (rpcRsp.code != 0) {
1,198,397!
UNCOV
408
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
UNCOV
409
    return;
×
410
  }
411
  dmProcessConfigRsp(pMgmt, &rpcRsp);
1,198,397✔
412
}
413

414
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
66,814,457✔
415
  dDebug("begin to get dnode info");
66,814,457✔
416
  SDnodeData dnodeData = {0};
66,814,457✔
417
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
66,814,457✔
418
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
66,814,457✔
419
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
66,814,457✔
420
  dnodeData.clusterId = pMgmt->pData->clusterId;
66,814,457✔
421
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
66,814,457✔
422
  dnodeData.updateTime = pMgmt->pData->updateTime;
66,814,457✔
423
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
66,814,457!
424
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
66,814,457✔
425

426
  dDebug("begin to get vnode loads");
66,814,457✔
427
  SMonVloadInfo vinfo = {0};
66,814,457✔
428
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
66,814,457✔
429

430
  dDebug("begin to get mnode loads");
66,814,457✔
431
  SMonMloadInfo minfo = {0};
66,814,457✔
432
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
66,814,457✔
433

434
  dDebug("begin to lock status info");
66,814,457✔
435
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
66,814,457!
UNCOV
436
    dError("failed to lock status info lock");
×
UNCOV
437
    return;
×
438
  }
439
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
66,814,457✔
440
  tsDnodeData.dnodeId = dnodeData.dnodeId;
66,814,457✔
441
  tsDnodeData.clusterId = dnodeData.clusterId;
66,814,457✔
442
  tsDnodeData.rebootTime = dnodeData.rebootTime;
66,814,457✔
443
  tsDnodeData.updateTime = dnodeData.updateTime;
66,814,457✔
444
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
66,814,457!
445

446
  if (tsVinfo.pVloads == NULL) {
66,814,457✔
447
    tsVinfo.pVloads = vinfo.pVloads;
64,886,886✔
448
    vinfo.pVloads = NULL;
64,886,886✔
449
  } else {
450
    taosArrayDestroy(vinfo.pVloads);
1,927,571✔
451
    vinfo.pVloads = NULL;
1,927,571✔
452
  }
453

454
  tsMLoad = minfo.load;
66,814,457✔
455

456
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
66,814,457!
UNCOV
457
    dError("failed to unlock status info lock");
×
UNCOV
458
    return;
×
459
  }
460
}
461

UNCOV
462
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
UNCOV
463
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
UNCOV
464
  if (contLen < 0) {
×
UNCOV
465
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
UNCOV
466
    return;
×
467
  }
468
  void *pHead = rpcMallocCont(contLen);
×
469
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
UNCOV
470
  if (contLen < 0) {
×
UNCOV
471
    rpcFreeCont(pHead);
×
UNCOV
472
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
473
    return;
×
474
  }
475

476
  SRpcMsg rpcMsg = {.pCont = pHead,
×
477
                    .contLen = contLen,
478
                    .msgType = TDMT_MND_NOTIFY,
479
                    .info.ahandle = 0,
480
                    .info.notFreeAhandle = 1,
481
                    .info.refId = 0,
482
                    .info.noResp = 1,
483
                    .info.handle = 0};
484

UNCOV
485
  SEpSet epSet = {0};
×
UNCOV
486
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
487
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
UNCOV
488
    dError("failed to send notify req");
×
489
  }
490
}
491

UNCOV
492
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
493
  dError("auth rsp is received, but not supported yet");
×
UNCOV
494
  return 0;
×
495
}
496

497
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
498
  dError("grant rsp is received, but not supported yet");
×
499
  return 0;
×
500
}
501

502
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
146,007✔
503
  int32_t       code = 0;
146,007✔
504
  SDCfgDnodeReq cfgReq = {0};
146,007✔
505
  SConfig      *pCfg = taosGetCfg();
146,007✔
506
  SConfigItem  *pItem = NULL;
146,007✔
507

508
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
146,007!
509
    return TSDB_CODE_INVALID_MSG;
×
510
  }
511
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
146,007✔
512
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
1,872✔
513
  }
514

515
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
144,135!
516

517
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
144,135✔
518
  if (code != 0) {
144,135✔
519
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
1,112!
520
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
1,112!
521
      return TSDB_CODE_SUCCESS;
1,112✔
522
    } else {
UNCOV
523
      return code;
×
524
    }
525
  }
526
  if (pItem == NULL) {
143,023!
UNCOV
527
    return TSDB_CODE_CFG_NOT_FOUND;
×
528
  }
529

530
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
143,023!
UNCOV
531
    char value[10] = {0};
×
UNCOV
532
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
UNCOV
533
      tsSyncTimeout = 0;
×
534
    }
535

UNCOV
536
    if (tsSyncTimeout > 0) {
×
UNCOV
537
      SConfigItem *pItemTmp = NULL;
×
538
      char         tmp[10] = {0};
×
539

UNCOV
540
      sprintf(tmp, "%d", tsSyncTimeout);
×
UNCOV
541
      TAOS_CHECK_RETURN(
×
542
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
543
      if (pItemTmp == NULL) {
×
544
        return TSDB_CODE_CFG_NOT_FOUND;
×
545
      }
546

547
      sprintf(tmp, "%d", tsSyncTimeout / 4);
×
548
      TAOS_CHECK_RETURN(
×
549
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
UNCOV
550
      if (pItemTmp == NULL) {
×
551
        return TSDB_CODE_CFG_NOT_FOUND;
×
552
      }
UNCOV
553
      TAOS_CHECK_RETURN(
×
554
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
555
      if (pItemTmp == NULL) {
×
UNCOV
556
        return TSDB_CODE_CFG_NOT_FOUND;
×
557
      }
558

559
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
UNCOV
560
      TAOS_CHECK_RETURN(
×
561
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
562
      if (pItemTmp == NULL) {
×
UNCOV
563
        return TSDB_CODE_CFG_NOT_FOUND;
×
564
      }
UNCOV
565
      TAOS_CHECK_RETURN(
×
566
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
567
      if (pItemTmp == NULL) {
×
UNCOV
568
        return TSDB_CODE_CFG_NOT_FOUND;
×
569
      }
570
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
571
      if (pItemTmp == NULL) {
×
UNCOV
572
        return TSDB_CODE_CFG_NOT_FOUND;
×
573
      }
574

UNCOV
575
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
576
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
UNCOV
577
      if (pItemTmp == NULL) {
×
578
        return TSDB_CODE_CFG_NOT_FOUND;
×
579
      }
580

581
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
582
      TAOS_CHECK_RETURN(
×
583
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
UNCOV
584
      if (pItemTmp == NULL) {
×
UNCOV
585
        return TSDB_CODE_CFG_NOT_FOUND;
×
586
      }
587
      TAOS_CHECK_RETURN(
×
588
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
589
      if (pItemTmp == NULL) {
×
UNCOV
590
        return TSDB_CODE_CFG_NOT_FOUND;
×
591
      }
592
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
593
      if (pItemTmp == NULL) {
×
UNCOV
594
        return TSDB_CODE_CFG_NOT_FOUND;
×
595
      }
596

UNCOV
597
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
598
            tsSyncTimeout);
599
    }
600
  }
601

602
  if (!isConifgItemLazyMode(pItem)) {
143,023✔
603
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
142,320!
604

605
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
142,320!
UNCOV
606
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
UNCOV
607
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
608
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
609

UNCOV
610
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
UNCOV
611
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
UNCOV
612
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
UNCOV
613
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
614

UNCOV
615
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
UNCOV
616
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
617
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
618

619
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
620
            tsSyncTimeout);
621
    }
622
  }
623

624
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
143,023✔
625
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
23,733✔
626
    if (code != TSDB_CODE_SUCCESS) {
23,733!
627
      dError("failed to persist global config since %s", tstrerror(code));
×
628
    }
629
  } else {
630
    code = taosPersistLocalConfig(pMgmt->path);
119,290✔
631
    if (code != TSDB_CODE_SUCCESS) {
119,290!
UNCOV
632
      dError("failed to persist local config since %s", tstrerror(code));
×
633
    }
634
  }
635

636
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
143,023!
UNCOV
637
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
638

UNCOV
639
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
UNCOV
640
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
641
  }
642

643
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
143,023!
644
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
143,023!
UNCOV
645
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
646
  }
647

648
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
143,023!
649
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
143,023!
650
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
651
  }
652

653
  if (cfgReq.version > 0) {
143,023✔
654
    tsdmConfigVersion = cfgReq.version;
56,161✔
655
  }
656
  return code;
143,023✔
657
}
658

659
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,147✔
660
#ifdef TD_ENTERPRISE
661
  int32_t       code = 0;
3,147✔
662
  SDCfgDnodeReq cfgReq = {0};
3,147✔
663
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
3,147!
UNCOV
664
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
665
    goto _exit;
×
666
  }
667

668
  code = dmUpdateEncryptKey(cfgReq.value, true);
3,147✔
669
  if (code == 0) {
3,147!
670
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
3,147✔
671
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
3,147✔
672
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
3,147!
673
  }
674

675
_exit:
3,147✔
676
  pMsg->code = code;
3,147✔
677
  pMsg->info.rsp = NULL;
3,147✔
678
  pMsg->info.rspLen = 0;
3,147✔
679
  return code;
3,147✔
680
#else
681
  return 0;
682
#endif
683
}
684

685
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
2,281✔
686
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
2,281✔
687
  pStatus->details[0] = 0;
2,281✔
688

689
  SMonMloadInfo minfo = {0};
2,281✔
690
  (*pMgmt->getMnodeLoadsFp)(&minfo);
2,281✔
691
  if (minfo.isMnode &&
2,281!
692
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
2,281!
UNCOV
693
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
UNCOV
694
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
UNCOV
695
    return;
×
696
  }
697

698
  SMonVloadInfo vinfo = {0};
2,281✔
699
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
2,281✔
700
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
6,843✔
701
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
4,562✔
702
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
4,562!
UNCOV
703
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
704
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
705
               syncStr(pLoad->syncState));
×
706
      break;
×
707
    }
708
  }
709

710
  taosArrayDestroy(vinfo.pVloads);
2,281✔
711
}
712

713
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,281✔
714
  int32_t code = 0;
2,281✔
715
  dDebug("server run status req is received");
2,281!
716
  SServerStatusRsp statusRsp = {0};
2,281✔
717
  dmGetServerRunStatus(pMgmt, &statusRsp);
2,281✔
718

719
  pMsg->info.rsp = NULL;
2,281✔
720
  pMsg->info.rspLen = 0;
2,281✔
721

722
  SRpcMsg rspMsg = {.info = pMsg->info};
2,281✔
723
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
2,281✔
724
  if (rspLen < 0) {
2,281!
UNCOV
725
    return TSDB_CODE_OUT_OF_MEMORY;
×
726
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
727
    // return rspMsg.code;
728
  }
729

730
  void *pRsp = rpcMallocCont(rspLen);
2,281✔
731
  if (pRsp == NULL) {
2,281!
UNCOV
732
    return terrno;
×
733
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
734
    // return rspMsg.code;
735
  }
736

737
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
2,281✔
738
  if (rspLen < 0) {
2,281!
UNCOV
739
    return TSDB_CODE_INVALID_MSG;
×
740
  }
741

742
  pMsg->info.rsp = pRsp;
2,281✔
743
  pMsg->info.rspLen = rspLen;
2,281✔
744
  return 0;
2,281✔
745
}
746

747
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
174,531✔
748
  int32_t code = 0;
174,531✔
749

750
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
174,531!
751
  if (pBlock == NULL) {
174,531!
UNCOV
752
    return terrno;
×
753
  }
754

755
  size_t size = 0;
174,531✔
756

757
  const SSysTableMeta *pMeta = NULL;
174,531✔
758
  getInfosDbMeta(&pMeta, &size);
174,531✔
759

760
  int32_t index = 0;
174,531✔
761
  for (int32_t i = 0; i < size; ++i) {
3,490,620!
762
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
3,490,620!
763
      index = i;
174,531✔
764
      break;
174,531✔
765
    }
766
  }
767

768
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
174,531✔
769
  if (pBlock->pDataBlock == NULL) {
174,531!
UNCOV
770
    code = terrno;
×
UNCOV
771
    goto _exit;
×
772
  }
773

774
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
1,221,717✔
775
    SColumnInfoData colInfoData = {0};
1,047,186✔
776
    colInfoData.info.colId = i + 1;
1,047,186✔
777
    colInfoData.info.type = pMeta[index].schema[i].type;
1,047,186✔
778
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
1,047,186✔
779
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
2,094,372!
UNCOV
780
      code = terrno;
×
781
      goto _exit;
×
782
    }
783
  }
784

785
  pBlock->info.hasVarCol = true;
174,531✔
786
_exit:
174,531✔
787
  if (code != 0) {
174,531!
UNCOV
788
    blockDataDestroy(pBlock);
×
789
  } else {
790
    *ppBlock = pBlock;
174,531✔
791
  }
792
  return code;
174,531✔
793
}
794

795
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
174,531✔
796
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
174,531✔
797
  if (code != 0) {
174,531!
UNCOV
798
    return code;
×
799
  }
800

801
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
174,531✔
802
  if (pColInfo == NULL) {
174,531!
UNCOV
803
    return TSDB_CODE_OUT_OF_RANGE;
×
804
  }
805

806
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
174,531✔
807
}
808

809
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
174,531✔
810
  int32_t           size = 0;
174,531✔
811
  int32_t           rowsRead = 0;
174,531✔
812
  int32_t           code = 0;
174,531✔
813
  SRetrieveTableReq retrieveReq = {0};
174,531✔
814
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
174,531!
UNCOV
815
    return TSDB_CODE_INVALID_MSG;
×
816
  }
817
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
174,531!
818
#if 0
819
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
820
    code = TSDB_CODE_MND_NO_RIGHTS;
821
    return code;
822
  }
823
#endif
824
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
174,531!
UNCOV
825
    return TSDB_CODE_INVALID_MSG;
×
826
  }
827

828
  SSDataBlock *pBlock = NULL;
174,531✔
829
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
174,531!
UNCOV
830
    return code;
×
831
  }
832

833
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
174,531✔
834
  if (code != 0) {
174,531!
UNCOV
835
    blockDataDestroy(pBlock);
×
836
    return code;
×
837
  }
838

839
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
174,531✔
840
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
174,531✔
841
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
174,531✔
842

843
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
174,531✔
844
  if (pRsp == NULL) {
174,531!
UNCOV
845
    code = terrno;
×
846
    dError("failed to retrieve data since %s", tstrerror(code));
×
847
    blockDataDestroy(pBlock);
×
UNCOV
848
    return code;
×
849
  }
850

851
  char *pStart = pRsp->data;
174,531✔
852
  *(int32_t *)pStart = htonl(numOfCols);
174,531✔
853
  pStart += sizeof(int32_t);  // number of columns
174,531✔
854

855
  for (int32_t i = 0; i < numOfCols; ++i) {
1,221,717✔
856
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
1,047,186✔
857
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
1,047,186✔
858

859
    pSchema->bytes = htonl(pColInfo->info.bytes);
1,047,186✔
860
    pSchema->colId = htons(pColInfo->info.colId);
1,047,186✔
861
    pSchema->type = pColInfo->info.type;
1,047,186✔
862

863
    pStart += sizeof(SSysTableSchema);
1,047,186✔
864
  }
865

866
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
174,531✔
867
  if (len < 0) {
174,531!
UNCOV
868
    dError("failed to retrieve data since %s", tstrerror(code));
×
UNCOV
869
    blockDataDestroy(pBlock);
×
UNCOV
870
    rpcFreeCont(pRsp);
×
UNCOV
871
    return terrno;
×
872
  }
873

874
  pRsp->numOfRows = htonl(pBlock->info.rows);
174,531✔
875
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
174,531✔
876
  pRsp->completed = 1;
174,531✔
877
  pMsg->info.rsp = pRsp;
174,531✔
878
  pMsg->info.rspLen = size;
174,531✔
879
  dDebug("dnode variables retrieve completed");
174,531!
880

881
  blockDataDestroy(pBlock);
174,531✔
882
  return TSDB_CODE_SUCCESS;
174,531✔
883
}
884

885
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
23,339,083✔
886
  SMStreamHbRspMsg rsp = {0};
23,339,083✔
887
  int32_t          code = 0;
23,339,083✔
888
  SDecoder         decoder;
23,329,934✔
889
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
23,339,083✔
890
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
23,339,083✔
891
  int64_t          currTs = taosGetTimestampMs();
23,339,083✔
892

893
  if (pMsg->code) {
23,339,083✔
894
    return streamHbHandleRspErr(pMsg->code, currTs);
383,221✔
895
  }
896

897
  tDecoderInit(&decoder, (uint8_t*)msg, len);
22,955,862✔
898
  code = tDecodeStreamHbRsp(&decoder, &rsp);
22,955,862✔
899
  if (code < 0) {
22,955,862!
UNCOV
900
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
901
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
UNCOV
902
    tDecoderClear(&decoder);
×
UNCOV
903
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
UNCOV
904
    return streamHbHandleRspErr(code, currTs);
×
905
  }
906

907
  tDecoderClear(&decoder);
22,955,862✔
908

909
  return streamHbProcessRspMsg(&rsp);
22,955,862✔
910
}
911

912

913
SArray *dmGetMsgHandles() {
1,206,900✔
914
  int32_t code = -1;
1,206,900✔
915
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
1,206,900✔
916
  if (pArray == NULL) {
1,206,900!
UNCOV
917
    return NULL;
×
918
  }
919

920
  // Requests handled by DNODE
921
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
922
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
923
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
924
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
925
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
926
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
927
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
928
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
929
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
930
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
931
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
932
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
933
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
934
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
935
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
936

937
  // Requests handled by MNODE
938
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
939
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
940
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,206,900!
941

942
  code = 0;
1,206,900✔
943

944
_OVER:
1,206,900✔
945
  if (code != 0) {
1,206,900!
UNCOV
946
    taosArrayDestroy(pArray);
×
UNCOV
947
    return NULL;
×
948
  } else {
949
    return pArray;
1,206,900✔
950
  }
951
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc