• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4886

16 Dec 2025 01:13AM UTC coverage: 65.292% (+0.03%) from 65.258%
#4886

push

travis-ci

web-flow
fix: compile error (#33938)

178718 of 273721 relevant lines covered (65.29%)

103311111.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.4
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
// #include "dmMgmt.h"
20
#include "monitor.h"
21
#include "stream.h"
22
#include "systable.h"
23
#include "tanalytics.h"
24
#include "tchecksum.h"
25
#include "tutil.h"
26

27
extern SConfig *tsCfg;
28

29
SMonVloadInfo tsVinfo = {0};
30
SMnodeLoad    tsMLoad = {0};
31
SDnodeData    tsDnodeData = {0};
32

33
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
2,486,272✔
34
  int32_t code = 0;
2,486,272✔
35
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
2,486,272✔
36
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
513,119✔
37
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
513,119✔
38
    pMgmt->pData->dnodeId = pCfg->dnodeId;
513,119✔
39
    pMgmt->pData->clusterId = pCfg->clusterId;
513,119✔
40
    monSetDnodeId(pCfg->dnodeId);
513,119✔
41
    auditSetDnodeId(pCfg->dnodeId);
513,119✔
42
    code = dmWriteEps(pMgmt->pData);
513,119✔
43
    if (code != 0) {
513,119✔
44
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
1,804✔
45
            tstrerror(code));
46
    }
47
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
513,119✔
48
  }
49
}
2,486,272✔
50

51
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,716,337✔
52
  int32_t code = 0;
2,716,337✔
53
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,716,337✔
54
  if (pMgmt->pData->ipWhiteVer == ver) {
2,716,337✔
55
    if (ver == 0) {
2,714,563✔
56
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
2,711,253✔
57
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,711,253✔
58
        dError("failed to disable ip white list on dnode");
×
59
      }
60
    }
61
    return;
2,714,563✔
62
  }
63
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
1,774✔
64

65
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,774✔
66
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,774✔
67
  if (contLen < 0) {
1,774✔
68
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
69
    return;
×
70
  }
71
  void *pHead = rpcMallocCont(contLen);
1,774✔
72
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,774✔
73
  if (contLen < 0) {
1,774✔
74
    rpcFreeCont(pHead);
×
75
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
76
    return;
×
77
  }
78

79
  SRpcMsg rpcMsg = {.pCont = pHead,
1,774✔
80
                    .contLen = contLen,
81
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL,
82
                    .info.ahandle = 0,
83
                    .info.notFreeAhandle = 1,
84
                    .info.refId = 0,
85
                    .info.noResp = 0,
86
                    .info.handle = 0};
87
  SEpSet  epset = {0};
1,774✔
88

89
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,774✔
90

91
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,774✔
92
  if (code != 0) {
1,774✔
93
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
94
  }
95
}
96

97

98

99
static void dmMayShouldUpdateTimeWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
2,716,337✔
100
  int32_t code = 0;
2,716,337✔
101
  dDebug("time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,716,337✔
102
  if (pMgmt->pData->timeWhiteVer == ver) {
2,716,337✔
103
    if (ver == 0) {
2,714,563✔
104
      dDebug("disable time-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->timeWhiteVer, ver);
2,711,253✔
105
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
2,711,253✔
106
        dError("failed to disable time white list on dnode");
×
107
      }
108
    }
109
    return;
2,714,563✔
110
  }
111
  int64_t oldVer = pMgmt->pData->timeWhiteVer;
1,774✔
112

113
  SRetrieveWhiteListReq req = {.ver = oldVer};
1,774✔
114
  int32_t             contLen = tSerializeRetrieveWhiteListReq(NULL, 0, &req);
1,774✔
115
  if (contLen < 0) {
1,774✔
116
    dError("failed to serialize datetime white list request since: %s", tstrerror(contLen));
×
117
    return;
×
118
  }
119
  void *pHead = rpcMallocCont(contLen);
1,774✔
120
  contLen = tSerializeRetrieveWhiteListReq(pHead, contLen, &req);
1,774✔
121
  if (contLen < 0) {
1,774✔
122
    rpcFreeCont(pHead);
×
123
    dError("failed to serialize datetime white list request since:%s", tstrerror(contLen));
×
124
    return;
×
125
  }
126

127
  SRpcMsg rpcMsg = {.pCont = pHead,
1,774✔
128
                    .contLen = contLen,
129
                    .msgType = TDMT_MND_RETRIEVE_DATETIME_WHITELIST,
130
                    .info.ahandle = 0,
131
                    .info.notFreeAhandle = 1,
132
                    .info.refId = 0,
133
                    .info.noResp = 0,
134
                    .info.handle = 0};
135
  SEpSet  epset = {0};
1,774✔
136

137
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
1,774✔
138

139
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
1,774✔
140
  if (code != 0) {
1,774✔
141
    dError("failed to send retrieve datetime white list request since:%s", tstrerror(code));
×
142
  }
143
}
144

145

146

147
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
2,716,337✔
148
  int32_t code = 0;
2,716,337✔
149
  int64_t oldVer = taosAnalyGetVersion();
2,716,337✔
150
  if (oldVer == newVer) return;
2,716,337✔
151
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
152

153
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
154
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
155
  if (contLen < 0) {
×
156
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
157
    return;
×
158
  }
159

160
  void *pHead = rpcMallocCont(contLen);
×
161
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
162
  if (contLen < 0) {
×
163
    rpcFreeCont(pHead);
×
164
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
165
    return;
×
166
  }
167

168
  SRpcMsg rpcMsg = {
×
169
      .pCont = pHead,
170
      .contLen = contLen,
171
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
172
      .info.ahandle = 0,
173
      .info.refId = 0,
174
      .info.noResp = 0,
175
      .info.handle = 0,
176
  };
177
  SEpSet epset = {0};
×
178

179
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
180

181
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
182
  if (code != 0) {
×
183
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
184
  }
185
}
186

187
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
40,651,627✔
188
  const STraceId *trace = &pRsp->info.traceId;
40,651,627✔
189
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
40,651,627✔
190

191
  if (pRsp->code != 0) {
40,651,627✔
192
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
529,607✔
193
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
194
             pMgmt->statusSeq);
195
      pMgmt->pData->dropped = 1;
×
196
      if (dmWriteEps(pMgmt->pData) != 0) {
×
197
        dError("failed to write dnode file");
×
198
      }
199
      dInfo("dnode will exit since it is in the dropped state");
×
200
      (void)raise(SIGINT);
×
201
    }
202
  } else {
203
    SStatusRsp statusRsp = {0};
40,122,020✔
204
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
42,838,357✔
205
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
2,716,337✔
206
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
2,716,337✔
207
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
2,486,272✔
208
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
209
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
2,486,272✔
210
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
2,486,272✔
211
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
2,486,272✔
212
      }
213
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
2,716,337✔
214
      dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer);
2,716,337✔
215
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
2,716,337✔
216
    }
217
    tFreeSStatusRsp(&statusRsp);
40,122,020✔
218
  }
219
  rpcFreeCont(pRsp->pCont);
40,651,627✔
220
}
40,651,627✔
221

222
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
40,750,130✔
223
  int32_t    code = 0;
40,750,130✔
224
  SStatusReq req = {0};
40,750,130✔
225
  req.timestamp = taosGetTimestampMs();
40,750,130✔
226
  pMgmt->statusSeq++;
40,750,130✔
227

228
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
40,750,130✔
229
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
40,750,130✔
230
    dError("failed to lock status info lock");
×
231
    return;
×
232
  }
233

234
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
40,750,130✔
235
  req.sver = tsVersion;
40,750,130✔
236
  req.dnodeVer = tsDnodeData.dnodeVer;
40,750,130✔
237
  req.dnodeId = tsDnodeData.dnodeId;
40,750,130✔
238
  req.clusterId = tsDnodeData.clusterId;
40,750,130✔
239
  if (req.clusterId == 0) req.dnodeId = 0;
40,750,130✔
240
  req.rebootTime = tsDnodeData.rebootTime;
40,750,130✔
241
  req.updateTime = tsDnodeData.updateTime;
40,750,130✔
242
  req.numOfCores = tsNumOfCores;
40,750,130✔
243
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
40,750,130✔
244
  req.numOfDiskCfg = tsDiskCfgNum;
40,750,130✔
245
  req.memTotal = tsTotalMemoryKB * 1024;
40,750,130✔
246
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
40,750,130✔
247
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
40,750,130✔
248
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
40,750,130✔
249

250
  req.clusterCfg.statusInterval = tsStatusInterval;
40,750,130✔
251
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
40,750,130✔
252
  req.clusterCfg.checkTime = 0;
40,750,130✔
253
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
40,750,130✔
254
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
40,750,130✔
255
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
40,750,130✔
256
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
40,750,130✔
257
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
40,750,130✔
258
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
40,750,130✔
259
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
40,750,130✔
260
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
40,750,130✔
261
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
40,750,130✔
262
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
40,750,130✔
263
  char timestr[32] = "1970-01-01 00:00:00.00";
40,750,130✔
264
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
40,750,130✔
265
      0) {
266
    dError("failed to parse time since %s", tstrerror(code));
×
267
  }
268
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
40,750,130✔
269
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
40,750,130✔
270
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
40,750,130✔
271

272
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
40,750,130✔
273

274
  req.pVloads = tsVinfo.pVloads;
40,750,130✔
275
  tsVinfo.pVloads = NULL;
40,750,130✔
276

277
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
40,750,130✔
278
  req.mload = tsMLoad;
40,750,130✔
279

280
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
40,750,130✔
281
    dError("failed to unlock status info lock");
×
282
    return;
×
283
  }
284

285
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
40,750,130✔
286
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
40,750,130✔
287

288
  req.statusSeq = pMgmt->statusSeq;
40,750,130✔
289
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
40,750,130✔
290
  req.analVer = taosAnalyGetVersion();
40,750,130✔
291
  req.timeWhiteVer = pMgmt->pData->timeWhiteVer;
40,750,130✔
292

293
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
40,750,130✔
294
  if (contLen < 0) {
40,750,130✔
295
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
296
    return;
×
297
  }
298

299
  void *pHead = rpcMallocCont(contLen);
40,750,130✔
300
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
40,750,130✔
301
  if (contLen < 0) {
40,750,130✔
302
    rpcFreeCont(pHead);
×
303
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
304
    return;
×
305
  }
306
  tFreeSStatusReq(&req);
40,750,130✔
307

308
  SRpcMsg rpcMsg = {.pCont = pHead,
40,750,130✔
309
                    .contLen = contLen,
310
                    .msgType = TDMT_MND_STATUS,
311
                    .info.ahandle = 0,
312
                    .info.notFreeAhandle = 1,
313
                    .info.refId = 0,
314
                    .info.noResp = 0,
315
                    .info.handle = 0};
316
  SRpcMsg rpcRsp = {0};
40,750,130✔
317

318
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
40,750,130✔
319

320
  SEpSet epSet = {0};
40,750,130✔
321
  int8_t epUpdated = 0;
40,750,130✔
322
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
40,750,130✔
323

324
  if (dDebugFlag & DEBUG_TRACE) {
40,750,130✔
325
    char tbuf[512];
1,156,170✔
326
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
1,156,170✔
327
    dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d to %s", pMgmt->statusSeq, tbuf);
1,156,170✔
328
  }
329
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
40,750,130✔
330
  if (code != 0) {
40,750,130✔
331
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
98,503✔
332
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
98,503✔
333
      dmRotateMnodeEpSet(pMgmt->pData);
98,503✔
334
      char tbuf[512];
98,503✔
335
      dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
98,503✔
336
      dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
98,503✔
337
            tbuf, epSet.inUse);
338
    }
339
    return;
98,503✔
340
  }
341

342
  if (rpcRsp.code != 0) {
40,651,627✔
343
    dmRotateMnodeEpSet(pMgmt->pData);
529,607✔
344
    char tbuf[512];
529,607✔
345
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
529,607✔
346
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
529,607✔
347
          tbuf, epSet.inUse);
348
  } else {
349
    if (epUpdated == 1) {
40,122,020✔
350
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
282,412✔
351
    }
352
  }
353
  dmProcessStatusRsp(pMgmt, &rpcRsp);
40,651,627✔
354
}
355

356
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
685,861✔
357
  const STraceId *trace = &pRsp->info.traceId;
685,861✔
358
  int32_t         code = 0;
685,861✔
359
  SConfigRsp      configRsp = {0};
685,861✔
360
  bool            needStop = false;
685,861✔
361

362
  if (pRsp->code != 0) {
685,861✔
363
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
364
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
365
      pMgmt->pData->dropped = 1;
×
366
      if (dmWriteEps(pMgmt->pData) != 0) {
×
367
        dError("failed to write dnode file");
×
368
      }
369
      dInfo("dnode will exit since it is in the dropped state");
×
370
      (void)raise(SIGINT);
×
371
    }
372
  } else {
373
    bool needUpdate = false;
685,861✔
374
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
1,371,722✔
375
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
685,861✔
376
      // Try to use cfg from mnode sdb.
377
      if (!configRsp.isVersionVerified) {
685,861✔
378
        uInfo("config version not verified, update config");
516,011✔
379
        needUpdate = true;
516,011✔
380
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
516,011✔
381
        if (code != TSDB_CODE_SUCCESS) {
516,011✔
382
          dError("failed to persist global config since %s", tstrerror(code));
×
383
          goto _exit;
×
384
        }
385
      }
386
    }
387
    if (needUpdate) {
685,861✔
388
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
516,011✔
389
      if (code != TSDB_CODE_SUCCESS) {
516,011✔
390
        dError("failed to update config since %s", tstrerror(code));
×
391
        goto _exit;
×
392
      }
393
      code = setAllConfigs(tsCfg);
516,011✔
394
      if (code != TSDB_CODE_SUCCESS) {
516,011✔
395
        dError("failed to set all configs since %s", tstrerror(code));
×
396
        goto _exit;
×
397
      }
398
    }
399
    code = taosPersistLocalConfig(pMgmt->path);
685,861✔
400
    if (code != TSDB_CODE_SUCCESS) {
685,861✔
401
      dError("failed to persist local config since %s", tstrerror(code));
×
402
    }
403
    tsConfigInited = 1;
685,861✔
404
  }
405
_exit:
685,861✔
406
  tFreeSConfigRsp(&configRsp);
685,861✔
407
  rpcFreeCont(pRsp->pCont);
685,861✔
408
  if (needStop) {
685,861✔
409
    dmStop();
×
410
  }
411
}
685,861✔
412

413
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
689,531✔
414
  int32_t    code = 0;
689,531✔
415
  SConfigReq req = {0};
689,531✔
416

417
  req.cver = tsdmConfigVersion;
689,531✔
418
  req.forceReadConfig = tsForceReadConfig;
689,531✔
419
  req.array = taosGetGlobalCfg(tsCfg);
689,531✔
420
  dDebug("send config req to mnode, configVersion:%d", req.cver);
689,531✔
421

422
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
689,531✔
423
  if (contLen < 0) {
689,531✔
424
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
425
    return;
×
426
  }
427

428
  void *pHead = rpcMallocCont(contLen);
689,531✔
429
  if (pHead == NULL) {
689,531✔
430
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
431
    return;
×
432
  }
433
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
689,531✔
434
  if (contLen < 0) {
689,531✔
435
    rpcFreeCont(pHead);
×
436
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
437
    return;
×
438
  }
439

440
  SRpcMsg rpcMsg = {.pCont = pHead,
689,531✔
441
                    .contLen = contLen,
442
                    .msgType = TDMT_MND_CONFIG,
443
                    .info.ahandle = 0,
444
                    .info.notFreeAhandle = 1,
445
                    .info.refId = 0,
446
                    .info.noResp = 0,
447
                    .info.handle = 0};
448
  SRpcMsg rpcRsp = {0};
689,531✔
449

450
  SEpSet epSet = {0};
689,531✔
451
  int8_t epUpdated = 0;
689,531✔
452
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
689,531✔
453

454
  dDebug("send config req to mnode, configSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
689,531✔
455
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
689,531✔
456
  if (code != 0) {
689,531✔
457
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
3,670✔
458
    return;
3,670✔
459
  }
460
  if (rpcRsp.code != 0) {
685,861✔
461
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
462
    return;
×
463
  }
464
  dmProcessConfigRsp(pMgmt, &rpcRsp);
685,861✔
465
}
466

467
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
41,533,785✔
468
  dDebug("begin to get dnode info");
41,533,785✔
469
  SDnodeData dnodeData = {0};
41,533,785✔
470
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
41,533,785✔
471
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
41,533,785✔
472
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
41,533,785✔
473
  dnodeData.clusterId = pMgmt->pData->clusterId;
41,533,785✔
474
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
41,533,785✔
475
  dnodeData.updateTime = pMgmt->pData->updateTime;
41,533,785✔
476
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
41,533,785✔
477
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
41,533,785✔
478

479
  dDebug("begin to get vnode loads");
41,533,785✔
480
  SMonVloadInfo vinfo = {0};
41,533,785✔
481
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
41,533,785✔
482

483
  dDebug("begin to get mnode loads");
41,533,785✔
484
  SMonMloadInfo minfo = {0};
41,533,785✔
485
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
41,533,785✔
486

487
  dDebug("begin to lock status info");
41,533,785✔
488
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
41,533,785✔
489
    dError("failed to lock status info lock");
×
490
    return;
×
491
  }
492
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
41,533,785✔
493
  tsDnodeData.dnodeId = dnodeData.dnodeId;
41,533,785✔
494
  tsDnodeData.clusterId = dnodeData.clusterId;
41,533,785✔
495
  tsDnodeData.rebootTime = dnodeData.rebootTime;
41,533,785✔
496
  tsDnodeData.updateTime = dnodeData.updateTime;
41,533,785✔
497
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
41,533,785✔
498

499
  if (tsVinfo.pVloads == NULL) {
41,533,785✔
500
    tsVinfo.pVloads = vinfo.pVloads;
40,202,870✔
501
    vinfo.pVloads = NULL;
40,202,870✔
502
  } else {
503
    taosArrayDestroy(vinfo.pVloads);
1,330,915✔
504
    vinfo.pVloads = NULL;
1,330,915✔
505
  }
506

507
  tsMLoad = minfo.load;
41,533,785✔
508

509
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
41,533,785✔
510
    dError("failed to unlock status info lock");
×
511
    return;
×
512
  }
513
}
514

515
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
516
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
517
  if (contLen < 0) {
×
518
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
519
    return;
×
520
  }
521
  void *pHead = rpcMallocCont(contLen);
×
522
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
523
  if (contLen < 0) {
×
524
    rpcFreeCont(pHead);
×
525
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
526
    return;
×
527
  }
528

529
  SRpcMsg rpcMsg = {.pCont = pHead,
×
530
                    .contLen = contLen,
531
                    .msgType = TDMT_MND_NOTIFY,
532
                    .info.ahandle = 0,
533
                    .info.notFreeAhandle = 1,
534
                    .info.refId = 0,
535
                    .info.noResp = 1,
536
                    .info.handle = 0};
537

538
  SEpSet epSet = {0};
×
539
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
540
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
541
    dError("failed to send notify req");
×
542
  }
543
}
544

545
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
546
  dError("auth rsp is received, but not supported yet");
×
547
  return 0;
×
548
}
549

550
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
551
  dError("grant rsp is received, but not supported yet");
×
552
  return 0;
×
553
}
554

555
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
84,628✔
556
  int32_t       code = 0;
84,628✔
557
  SDCfgDnodeReq cfgReq = {0};
84,628✔
558
  SConfig      *pCfg = taosGetCfg();
84,628✔
559
  SConfigItem  *pItem = NULL;
84,628✔
560

561
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
84,628✔
562
    return TSDB_CODE_INVALID_MSG;
×
563
  }
564
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
84,628✔
565
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
518✔
566
  }
567

568
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
84,110✔
569

570
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
84,110✔
571
  if (code != 0) {
84,110✔
572
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
463✔
573
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
463✔
574
      return TSDB_CODE_SUCCESS;
463✔
575
    } else {
576
      return code;
×
577
    }
578
  }
579
  if (pItem == NULL) {
83,647✔
580
    return TSDB_CODE_CFG_NOT_FOUND;
×
581
  }
582

583
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
83,647✔
584
    char value[10] = {0};
×
585
    if (sscanf(cfgReq.value, "%d", &tsSyncTimeout) != 1) {
×
586
      tsSyncTimeout = 0;
×
587
    }
588

589
    if (tsSyncTimeout > 0) {
×
590
      SConfigItem *pItemTmp = NULL;
×
591
      char         tmp[10] = {0};
×
592

593
      sprintf(tmp, "%d", tsSyncTimeout);
×
594
      TAOS_CHECK_RETURN(
×
595
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
596
      if (pItemTmp == NULL) {
×
597
        return TSDB_CODE_CFG_NOT_FOUND;
×
598
      }
599

600
      sprintf(tmp, "%d", tsSyncTimeout / 4);
×
601
      TAOS_CHECK_RETURN(
×
602
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
603
      if (pItemTmp == NULL) {
×
604
        return TSDB_CODE_CFG_NOT_FOUND;
×
605
      }
606
      TAOS_CHECK_RETURN(
×
607
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
608
      if (pItemTmp == NULL) {
×
609
        return TSDB_CODE_CFG_NOT_FOUND;
×
610
      }
611

612
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 2);
×
613
      TAOS_CHECK_RETURN(
×
614
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
615
      if (pItemTmp == NULL) {
×
616
        return TSDB_CODE_CFG_NOT_FOUND;
×
617
      }
618
      TAOS_CHECK_RETURN(
×
619
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
620
      if (pItemTmp == NULL) {
×
621
        return TSDB_CODE_CFG_NOT_FOUND;
×
622
      }
623
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
624
      if (pItemTmp == NULL) {
×
625
        return TSDB_CODE_CFG_NOT_FOUND;
×
626
      }
627

628
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
629
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
630
      if (pItemTmp == NULL) {
×
631
        return TSDB_CODE_CFG_NOT_FOUND;
×
632
      }
633

634
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 8);
×
635
      TAOS_CHECK_RETURN(
×
636
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
637
      if (pItemTmp == NULL) {
×
638
        return TSDB_CODE_CFG_NOT_FOUND;
×
639
      }
640
      TAOS_CHECK_RETURN(
×
641
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
642
      if (pItemTmp == NULL) {
×
643
        return TSDB_CODE_CFG_NOT_FOUND;
×
644
      }
645
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
646
      if (pItemTmp == NULL) {
×
647
        return TSDB_CODE_CFG_NOT_FOUND;
×
648
      }
649

650
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
651
            tsSyncTimeout);
652
    }
653
  }
654

655
  if (!isConifgItemLazyMode(pItem)) {
83,647✔
656
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
82,774✔
657

658
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
82,774✔
659
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
660
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
661
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
662

663
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
664
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
665
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
666
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
667

668
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
669
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
670
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
671

672
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
673
            tsSyncTimeout);
674
    }
675
  }
676

677
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
83,647✔
678
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
14,734✔
679
    if (code != TSDB_CODE_SUCCESS) {
14,734✔
680
      dError("failed to persist global config since %s", tstrerror(code));
×
681
    }
682
  } else {
683
    code = taosPersistLocalConfig(pMgmt->path);
68,913✔
684
    if (code != TSDB_CODE_SUCCESS) {
68,913✔
685
      dError("failed to persist local config since %s", tstrerror(code));
×
686
    }
687
  }
688

689
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
83,647✔
690
    dInfo("finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
691

692
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
693
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
694
  }
695

696
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 ||
83,647✔
697
      taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
83,647✔
698
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
699
  }
700

701
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 ||
83,647✔
702
      taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
83,647✔
703
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
704
  }
705

706
  if (cfgReq.version > 0) {
83,647✔
707
    tsdmConfigVersion = cfgReq.version;
29,234✔
708
  }
709
  return code;
83,647✔
710
}
711

712
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,844✔
713
#ifdef TD_ENTERPRISE
714
  int32_t       code = 0;
2,844✔
715
  SDCfgDnodeReq cfgReq = {0};
2,844✔
716
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
2,844✔
717
    code = TSDB_CODE_INVALID_MSG;
×
718
    goto _exit;
×
719
  }
720

721
  code = dmUpdateEncryptKey(cfgReq.value, true);
2,844✔
722
  if (code == 0) {
2,844✔
723
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
2,844✔
724
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
2,844✔
725
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
2,844✔
726
  }
727

728
_exit:
2,844✔
729
  pMsg->code = code;
2,844✔
730
  pMsg->info.rsp = NULL;
2,844✔
731
  pMsg->info.rspLen = 0;
2,844✔
732
  return code;
2,844✔
733
#else
734
  return 0;
735
#endif
736
}
737

738
int32_t dmProcessReloadTlsConfig(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
739
  int32_t code = 0;
×
740
  int32_t lino = 0;
×
741
  SMsgCb *msgCb = &pMgmt->msgCb;
×
742
  void *pTransCli = msgCb->clientRpc;
×
743
  void *pTransStatus = msgCb->statusRpc;  
×
744
  void *pTransSync = msgCb->syncRpc; 
×
745
  void *pTransServer = msgCb->serverRpc;
×
746

747
  code = rpcReloadTlsConfig(pTransServer, TAOS_CONN_SERVER);
×
748
  if (code != 0) {
×
749
    dError("failed to reload tls config for transport %s since %s", "server", tstrerror(code));
×
750
    goto _error;
×
751
  }
752

753
  code = rpcReloadTlsConfig(pTransCli, TAOS_CONN_CLIENT);
×
754
  if (code != 0) {
×
755
    dError("failed to reload tls config for transport %s since %s", "cli", tstrerror(code));
×
756
    goto _error;
×
757
  }
758

759
  code = rpcReloadTlsConfig(pTransStatus, TAOS_CONN_CLIENT);
×
760
  if (code != 0) {
×
761
    dError("failed to reload tls config for transport %s since %s", "status-cli", tstrerror(code));
×
762
    goto _error;
×
763
  }
764

765
  code = rpcReloadTlsConfig(pTransSync, TAOS_CONN_CLIENT);
×
766
  if (code != 0) {
×
767
    dError("failed to reload tls config for transport %s since %s", "sync-cli", tstrerror(code));
×
768
    goto _error;
×
769
  }
770

771
_error:
×
772
  
773
  return code;
×
774
}
775

776
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
808✔
777
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
808✔
778
  pStatus->details[0] = 0;
808✔
779

780
  SMonMloadInfo minfo = {0};
808✔
781
  (*pMgmt->getMnodeLoadsFp)(&minfo);
808✔
782
  if (minfo.isMnode &&
808✔
783
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
808✔
784
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
785
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
786
    return;
×
787
  }
788

789
  SMonVloadInfo vinfo = {0};
808✔
790
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
808✔
791
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
2,424✔
792
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
1,616✔
793
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
1,616✔
794
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
795
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
796
               syncStr(pLoad->syncState));
×
797
      break;
×
798
    }
799
  }
800

801
  taosArrayDestroy(vinfo.pVloads);
808✔
802
}
803

804
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
808✔
805
  int32_t code = 0;
808✔
806
  dDebug("server run status req is received");
808✔
807
  SServerStatusRsp statusRsp = {0};
808✔
808
  dmGetServerRunStatus(pMgmt, &statusRsp);
808✔
809

810
  pMsg->info.rsp = NULL;
808✔
811
  pMsg->info.rspLen = 0;
808✔
812

813
  SRpcMsg rspMsg = {.info = pMsg->info};
808✔
814
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
808✔
815
  if (rspLen < 0) {
808✔
816
    return TSDB_CODE_OUT_OF_MEMORY;
×
817
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
818
    // return rspMsg.code;
819
  }
820

821
  void *pRsp = rpcMallocCont(rspLen);
808✔
822
  if (pRsp == NULL) {
808✔
823
    return terrno;
×
824
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
825
    // return rspMsg.code;
826
  }
827

828
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
808✔
829
  if (rspLen < 0) {
808✔
830
    return TSDB_CODE_INVALID_MSG;
×
831
  }
832

833
  pMsg->info.rsp = pRsp;
808✔
834
  pMsg->info.rspLen = rspLen;
808✔
835
  return 0;
808✔
836
}
837

838
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
70,772✔
839
  int32_t code = 0;
70,772✔
840

841
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
70,772✔
842
  if (pBlock == NULL) {
70,772✔
843
    return terrno;
×
844
  }
845

846
  size_t size = 0;
70,772✔
847

848
  const SSysTableMeta *pMeta = NULL;
70,772✔
849
  getInfosDbMeta(&pMeta, &size);
70,772✔
850

851
  int32_t index = 0;
70,772✔
852
  for (int32_t i = 0; i < size; ++i) {
1,415,440✔
853
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
1,415,440✔
854
      index = i;
70,772✔
855
      break;
70,772✔
856
    }
857
  }
858

859
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
70,772✔
860
  if (pBlock->pDataBlock == NULL) {
70,772✔
861
    code = terrno;
×
862
    goto _exit;
×
863
  }
864

865
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
495,404✔
866
    SColumnInfoData colInfoData = {0};
424,632✔
867
    colInfoData.info.colId = i + 1;
424,632✔
868
    colInfoData.info.type = pMeta[index].schema[i].type;
424,632✔
869
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
424,632✔
870
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
849,264✔
871
      code = terrno;
×
872
      goto _exit;
×
873
    }
874
  }
875

876
  pBlock->info.hasVarCol = true;
70,772✔
877
_exit:
70,772✔
878
  if (code != 0) {
70,772✔
879
    blockDataDestroy(pBlock);
×
880
  } else {
881
    *ppBlock = pBlock;
70,772✔
882
  }
883
  return code;
70,772✔
884
}
885

886
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
70,772✔
887
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
70,772✔
888
  if (code != 0) {
70,772✔
889
    return code;
×
890
  }
891

892
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
70,772✔
893
  if (pColInfo == NULL) {
70,772✔
894
    return TSDB_CODE_OUT_OF_RANGE;
×
895
  }
896

897
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, 1, false);
70,772✔
898
}
899

900
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
70,772✔
901
  int32_t           size = 0;
70,772✔
902
  int32_t           rowsRead = 0;
70,772✔
903
  int32_t           code = 0;
70,772✔
904
  SRetrieveTableReq retrieveReq = {0};
70,772✔
905
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
70,772✔
906
    return TSDB_CODE_INVALID_MSG;
×
907
  }
908
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
70,772✔
909
#if 0
910
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
911
    code = TSDB_CODE_MND_NO_RIGHTS;
912
    return code;
913
  }
914
#endif
915
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
70,772✔
916
    return TSDB_CODE_INVALID_MSG;
×
917
  }
918

919
  SSDataBlock *pBlock = NULL;
70,772✔
920
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
70,772✔
921
    return code;
×
922
  }
923

924
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
70,772✔
925
  if (code != 0) {
70,772✔
926
    blockDataDestroy(pBlock);
×
927
    return code;
×
928
  }
929

930
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
70,772✔
931
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
70,772✔
932
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
70,772✔
933

934
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
70,772✔
935
  if (pRsp == NULL) {
70,772✔
936
    code = terrno;
×
937
    dError("failed to retrieve data since %s", tstrerror(code));
×
938
    blockDataDestroy(pBlock);
×
939
    return code;
×
940
  }
941

942
  char *pStart = pRsp->data;
70,772✔
943
  *(int32_t *)pStart = htonl(numOfCols);
70,772✔
944
  pStart += sizeof(int32_t);  // number of columns
70,772✔
945

946
  for (int32_t i = 0; i < numOfCols; ++i) {
495,404✔
947
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
424,632✔
948
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
424,632✔
949

950
    pSchema->bytes = htonl(pColInfo->info.bytes);
424,632✔
951
    pSchema->colId = htons(pColInfo->info.colId);
424,632✔
952
    pSchema->type = pColInfo->info.type;
424,632✔
953

954
    pStart += sizeof(SSysTableSchema);
424,632✔
955
  }
956

957
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
70,772✔
958
  if (len < 0) {
70,772✔
959
    dError("failed to retrieve data since %s", tstrerror(code));
×
960
    blockDataDestroy(pBlock);
×
961
    rpcFreeCont(pRsp);
×
962
    return terrno;
×
963
  }
964

965
  pRsp->numOfRows = htonl(pBlock->info.rows);
70,772✔
966
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
70,772✔
967
  pRsp->completed = 1;
70,772✔
968
  pMsg->info.rsp = pRsp;
70,772✔
969
  pMsg->info.rspLen = size;
70,772✔
970
  dDebug("dnode variables retrieve completed");
70,772✔
971

972
  blockDataDestroy(pBlock);
70,772✔
973
  return TSDB_CODE_SUCCESS;
70,772✔
974
}
975

976
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,588,059✔
977
  SMStreamHbRspMsg rsp = {0};
14,588,059✔
978
  int32_t          code = 0;
14,588,059✔
979
  SDecoder         decoder;
14,584,699✔
980
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
14,588,059✔
981
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
14,588,059✔
982
  int64_t          currTs = taosGetTimestampMs();
14,588,059✔
983

984
  if (pMsg->code) {
14,588,059✔
985
    return streamHbHandleRspErr(pMsg->code, currTs);
217,757✔
986
  }
987

988
  tDecoderInit(&decoder, (uint8_t*)msg, len);
14,370,302✔
989
  code = tDecodeStreamHbRsp(&decoder, &rsp);
14,370,302✔
990
  if (code < 0) {
14,370,302✔
991
    code = TSDB_CODE_INVALID_MSG;
×
992
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
993
    tDecoderClear(&decoder);
×
994
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
995
    return streamHbHandleRspErr(code, currTs);
×
996
  }
997

998
  tDecoderClear(&decoder);
14,370,302✔
999

1000
  return streamHbProcessRspMsg(&rsp);
14,370,302✔
1001
}
1002

1003

1004
SArray *dmGetMsgHandles() {
689,095✔
1005
  int32_t code = -1;
689,095✔
1006
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
689,095✔
1007
  if (pArray == NULL) {
689,095✔
1008
    return NULL;
×
1009
  }
1010

1011
  // Requests handled by DNODE
1012
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1013
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1014
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1015
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1016
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1017
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1018
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1019
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1020
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1021
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1022
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1023
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1024
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1025
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1026
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1027
  if (dmSetMgmtHandle(pArray, TDMT_DND_RELOAD_DNODE_TLS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1028

1029
  // Requests handled by MNODE
1030
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1031
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1032
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
689,095✔
1033

1034
  code = 0;
689,095✔
1035

1036
_OVER:
689,095✔
1037
  if (code != 0) {
689,095✔
1038
    taosArrayDestroy(pArray);
×
1039
    return NULL;
×
1040
  } else {
1041
    return pArray;
689,095✔
1042
  }
1043
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc