• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.54
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
#include "monitor.h"
20
#include "systable.h"
21
#include "tanalytics.h"
22
#include "tchecksum.h"
23
#include "tutil.h"
24
#include "stream.h"
25

26
extern SConfig *tsCfg;
27

28
SMonVloadInfo tsVinfo = {0};
29
SMnodeLoad    tsMLoad = {0};
30
SDnodeData    tsDnodeData = {0};
31

32
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
9,199✔
33
  int32_t code = 0;
9,199✔
34
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
9,199✔
35
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
1,913!
36
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
1,913✔
37
    pMgmt->pData->dnodeId = pCfg->dnodeId;
1,913✔
38
    pMgmt->pData->clusterId = pCfg->clusterId;
1,913✔
39
    monSetDnodeId(pCfg->dnodeId);
1,913✔
40
    auditSetDnodeId(pCfg->dnodeId);
1,913✔
41
    code = dmWriteEps(pMgmt->pData);
1,913✔
42
    if (code != 0) {
1,913!
43
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
×
44
            tstrerror(code));
45
    }
46
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
1,913✔
47
  }
48
}
9,199✔
49

50
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
9,581✔
51
  int32_t code = 0;
9,581✔
52
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
9,581✔
53
  if (pMgmt->pData->ipWhiteVer == ver) {
9,581✔
54
    if (ver == 0) {
9,574✔
55
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64, pMgmt->pData->ipWhiteVer, ver);
9,562✔
56
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
9,562!
57
        dError("failed to disable ip white list on dnode");
×
58
      }
59
    }
60
    return;
9,574✔
61
  }
62
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
7✔
63

64
  SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
7✔
65
  int32_t             contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
7✔
66
  if (contLen < 0) {
7!
67
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
68
    return;
×
69
  }
70
  void *pHead = rpcMallocCont(contLen);
7✔
71
  contLen = tSerializeRetrieveIpWhite(pHead, contLen, &req);
7✔
72
  if (contLen < 0) {
7!
73
    rpcFreeCont(pHead);
×
74
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
75
    return;
×
76
  }
77

78
  SRpcMsg rpcMsg = {.pCont = pHead,
7✔
79
                    .contLen = contLen,
80
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITE_DUAL,
81
                    .info.ahandle = 0,
82
                    .info.notFreeAhandle = 1,
83
                    .info.refId = 0,
84
                    .info.noResp = 0,
85
                    .info.handle = 0};
86
  SEpSet  epset = {0};
7✔
87

88
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
7✔
89

90
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
7✔
91
  if (code != 0) {
7!
92
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
93
  }
94
}
95

96
static void dmMayShouldUpdateAnalyticsFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
9,581✔
97
  int32_t code = 0;
9,581✔
98
  int64_t oldVer = taosAnalyGetVersion();
9,581✔
99
  if (oldVer == newVer) return;
9,581!
100
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
101

102
  SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
103
  int32_t              contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
×
104
  if (contLen < 0) {
×
105
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
106
    return;
×
107
  }
108

109
  void *pHead = rpcMallocCont(contLen);
×
110
  contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
×
111
  if (contLen < 0) {
×
112
    rpcFreeCont(pHead);
×
113
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
114
    return;
×
115
  }
116

117
  SRpcMsg rpcMsg = {
×
118
      .pCont = pHead,
119
      .contLen = contLen,
120
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
121
      .info.ahandle = 0,
122
      .info.refId = 0,
123
      .info.noResp = 0,
124
      .info.handle = 0,
125
  };
126
  SEpSet epset = {0};
×
127

128
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
129

130
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
131
  if (code != 0) {
×
132
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
133
  }
134
}
135

136
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
96,395✔
137
  const STraceId *trace = &pRsp->info.traceId;
96,395✔
138
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
96,395!
139

140
  if (pRsp->code != 0) {
96,395!
141
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
142
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
143
             pMgmt->statusSeq);
144
      pMgmt->pData->dropped = 1;
×
145
      if (dmWriteEps(pMgmt->pData) != 0) {
×
146
        dError("failed to write dnode file");
×
147
      }
148
      dInfo("dnode will exit since it is in the dropped state");
×
149
      (void)raise(SIGINT);
×
150
    }
151
  } else {
152
    SStatusRsp statusRsp = {0};
96,395✔
153
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
105,976!
154
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
9,581✔
155
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
9,581✔
156
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
9,199!
157
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
158
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
9,199✔
159
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
9,199✔
160
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
9,199✔
161
      }
162
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
9,581✔
163
      dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer);
9,581✔
164
    }
165
    tFreeSStatusRsp(&statusRsp);
96,395✔
166
  }
167
  rpcFreeCont(pRsp->pCont);
96,395✔
168
}
96,395✔
169

170
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
96,672✔
171
  int32_t    code = 0;
96,672✔
172
  SStatusReq req = {0};
96,672✔
173
  req.timestamp = taosGetTimestampMs();
96,672✔
174
  pMgmt->statusSeq++;
96,672✔
175

176
  dTrace("send status req to mnode, begin to mgnt statusInfolock, statusSeq:%d", pMgmt->statusSeq);
96,672✔
177
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
96,672!
178
    dError("failed to lock status info lock");
×
179
    return;
277✔
180
  }
181

182
  dTrace("send status req to mnode, begin to get dnode info, statusSeq:%d", pMgmt->statusSeq);
96,672✔
183
  req.sver = tsVersion;
96,672✔
184
  req.dnodeVer = tsDnodeData.dnodeVer;
96,672✔
185
  req.dnodeId = tsDnodeData.dnodeId;
96,672✔
186
  req.clusterId = tsDnodeData.clusterId;
96,672✔
187
  if (req.clusterId == 0) req.dnodeId = 0;
96,672✔
188
  req.rebootTime = tsDnodeData.rebootTime;
96,672✔
189
  req.updateTime = tsDnodeData.updateTime;
96,672✔
190
  req.numOfCores = tsNumOfCores;
96,672✔
191
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
96,672✔
192
  req.numOfDiskCfg = tsDiskCfgNum;
96,672✔
193
  req.memTotal = tsTotalMemoryKB * 1024;
96,672✔
194
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
96,672✔
195
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
96,672✔
196
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
96,672✔
197

198
  req.clusterCfg.statusIntervalMs = tsStatusIntervalMs;
96,672✔
199
  req.clusterCfg.checkTime = 0;
96,672✔
200
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
96,672✔
201
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
96,672✔
202
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
96,672✔
203
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
96,672✔
204
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
96,672✔
205
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
96,672✔
206
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
96,672✔
207
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
96,672✔
208
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
96,672✔
209
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
96,672✔
210
  char timestr[32] = "1970-01-01 00:00:00.00";
96,672✔
211
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
96,672!
212
      0) {
213
    dError("failed to parse time since %s", tstrerror(code));
×
214
  }
215
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
96,672✔
216
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
96,672✔
217
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
96,672✔
218

219
  dTrace("send status req to mnode, begin to get vnode loads, statusSeq:%d", pMgmt->statusSeq);
96,672✔
220

221
  req.pVloads = tsVinfo.pVloads;
96,672✔
222
  tsVinfo.pVloads = NULL;
96,672✔
223

224
  dTrace("send status req to mnode, begin to get mnode loads, statusSeq:%d", pMgmt->statusSeq);
96,672✔
225
  req.mload = tsMLoad;
96,672✔
226

227
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
96,672!
228
    dError("failed to unlock status info lock");
×
229
    return;
×
230
  }
231

232
  dTrace("send status req to mnode, begin to get qnode loads, statusSeq:%d", pMgmt->statusSeq);
96,672✔
233
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
96,672✔
234

235
  req.statusSeq = pMgmt->statusSeq;
96,672✔
236
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
96,672✔
237
  req.analVer = taosAnalyGetVersion();
96,672✔
238

239
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
96,672✔
240
  if (contLen < 0) {
96,672!
241
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
242
    return;
×
243
  }
244

245
  void *pHead = rpcMallocCont(contLen);
96,672✔
246
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
96,672✔
247
  if (contLen < 0) {
96,672!
248
    rpcFreeCont(pHead);
×
249
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
250
    return;
×
251
  }
252
  tFreeSStatusReq(&req);
96,672✔
253

254
  SRpcMsg rpcMsg = {.pCont = pHead,
96,672✔
255
                    .contLen = contLen,
256
                    .msgType = TDMT_MND_STATUS,
257
                    .info.ahandle = 0,
258
                    .info.notFreeAhandle = 1,
259
                    .info.refId = 0,
260
                    .info.noResp = 0,
261
                    .info.handle = 0};
262
  SRpcMsg rpcRsp = {0};
96,672✔
263

264
  dDebug("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
96,672✔
265

266
  SEpSet epSet = {0};
96,672✔
267
  int8_t epUpdated = 0;
96,672✔
268
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
96,672✔
269

270
  dTrace("send status req to mnode, begin to send rpc msg, statusSeq:%d", pMgmt->statusSeq);
96,672✔
271
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
96,672✔
272
  if (code != 0) {
96,672✔
273
    dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
277!
274
    return;
277✔
275
  }
276

277
  if (rpcRsp.code != 0) {
96,395!
278
    dmRotateMnodeEpSet(pMgmt->pData);
×
279
    char tbuf[512];
280
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
×
281
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
×
282
          tbuf, epSet.inUse);
283
  } else {
284
    if (epUpdated == 1) {
96,395✔
285
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
1,084✔
286
    }
287
  }
288
  dmProcessStatusRsp(pMgmt, &rpcRsp);
96,395✔
289
}
290

291
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
2,377✔
292
  const STraceId *trace = &pRsp->info.traceId;
2,377✔
293
  int32_t         code = 0;
2,377✔
294
  SConfigRsp      configRsp = {0};
2,377✔
295
  bool            needStop = false;
2,377✔
296

297
  if (pRsp->code != 0) {
2,377!
298
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
299
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
300
      pMgmt->pData->dropped = 1;
×
301
      if (dmWriteEps(pMgmt->pData) != 0) {
×
302
        dError("failed to write dnode file");
×
303
      }
304
      dInfo("dnode will exit since it is in the dropped state");
×
305
      (void)raise(SIGINT);
×
306
    }
307
  } else {
308
    bool needUpdate = false;
2,377✔
309
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
4,754!
310
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
2,377✔
311
      // Try to use cfg from mnode sdb.
312
      if (!configRsp.isVersionVerified) {
2,377✔
313
        uInfo("config version not verified, update config");
1,928!
314
        needUpdate = true;
1,928✔
315
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
1,928✔
316
        if (code != TSDB_CODE_SUCCESS) {
1,928✔
317
          dError("failed to persist global config since %s", tstrerror(code));
9!
318
          goto _exit;
9✔
319
        }
320
      }
321
    }
322
    if (needUpdate) {
2,368✔
323
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
1,919✔
324
      if (code != TSDB_CODE_SUCCESS) {
1,919!
325
        dError("failed to update config since %s", tstrerror(code));
×
326
        goto _exit;
×
327
      }
328
      code = setAllConfigs(tsCfg);
1,919✔
329
      if (code != TSDB_CODE_SUCCESS) {
1,919✔
330
        dError("failed to set all configs since %s", tstrerror(code));
24!
331
        goto _exit;
24✔
332
      }
333
    }
334
    code = taosPersistLocalConfig(pMgmt->path);
2,344✔
335
    if (code != TSDB_CODE_SUCCESS) {
2,344!
336
      dError("failed to persist local config since %s", tstrerror(code));
×
337
    }
338
    tsConfigInited = 1;
2,344✔
339
  }
340
_exit:
2,377✔
341
  tFreeSConfigRsp(&configRsp);
2,377✔
342
  rpcFreeCont(pRsp->pCont);
2,377✔
343
  if (needStop) {
2,377!
344
    dmStop();
×
345
  }
346
}
2,377✔
347

348
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
2,380✔
349
  int32_t    code = 0;
2,380✔
350
  SConfigReq req = {0};
2,380✔
351

352
  req.cver = tsdmConfigVersion;
2,380✔
353
  req.forceReadConfig = tsForceReadConfig;
2,380✔
354
  req.array = taosGetGlobalCfg(tsCfg);
2,380✔
355
  dDebug("send config req to mnode, configVersion:%d", req.cver);
2,380✔
356

357
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
2,380✔
358
  if (contLen < 0) {
2,380!
359
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
360
    return;
3✔
361
  }
362

363
  void *pHead = rpcMallocCont(contLen);
2,380✔
364
  if (pHead == NULL) {
2,380!
365
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
366
    return;
×
367
  }
368
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
2,380✔
369
  if (contLen < 0) {
2,380!
370
    rpcFreeCont(pHead);
×
371
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
372
    return;
×
373
  }
374

375
  SRpcMsg rpcMsg = {.pCont = pHead,
2,380✔
376
                    .contLen = contLen,
377
                    .msgType = TDMT_MND_CONFIG,
378
                    .info.ahandle = 0,
379
                    .info.notFreeAhandle = 1,
380
                    .info.refId = 0,
381
                    .info.noResp = 0,
382
                    .info.handle = 0};
383
  SRpcMsg rpcRsp = {0};
2,380✔
384

385
  SEpSet epSet = {0};
2,380✔
386
  int8_t epUpdated = 0;
2,380✔
387
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
2,380✔
388

389
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
2,380✔
390
  code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs);
2,380✔
391
  if (code != 0) {
2,380✔
392
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code));
3!
393
    return;
3✔
394
  }
395
  if (rpcRsp.code != 0) {
2,377!
396
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
397
    return;
×
398
  }
399
  dmProcessConfigRsp(pMgmt, &rpcRsp);
2,377✔
400
}
401

402
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
98,459✔
403
  dDebug("begin to get dnode info");
98,459✔
404
  SDnodeData dnodeData = {0};
98,459✔
405
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
98,459✔
406
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
98,459✔
407
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
98,459✔
408
  dnodeData.clusterId = pMgmt->pData->clusterId;
98,459✔
409
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
98,459✔
410
  dnodeData.updateTime = pMgmt->pData->updateTime;
98,459✔
411
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
98,459✔
412
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
98,459✔
413

414
  dDebug("begin to get vnode loads");
98,459✔
415
  SMonVloadInfo vinfo = {0};
98,459✔
416
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
98,459✔
417

418
  dDebug("begin to get mnode loads");
98,459✔
419
  SMonMloadInfo minfo = {0};
98,459✔
420
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
98,459✔
421

422
  dDebug("begin to lock status info");
98,459✔
423
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
98,459!
424
    dError("failed to lock status info lock");
×
425
    return;
×
426
  }
427
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
98,459✔
428
  tsDnodeData.dnodeId = dnodeData.dnodeId;
98,459✔
429
  tsDnodeData.clusterId = dnodeData.clusterId;
98,459✔
430
  tsDnodeData.rebootTime = dnodeData.rebootTime;
98,459✔
431
  tsDnodeData.updateTime = dnodeData.updateTime;
98,459✔
432
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
98,459✔
433

434
  if (tsVinfo.pVloads == NULL) {
98,459✔
435
    tsVinfo.pVloads = vinfo.pVloads;
94,849✔
436
    vinfo.pVloads = NULL;
94,849✔
437
  } else {
438
    taosArrayDestroy(vinfo.pVloads);
3,610✔
439
    vinfo.pVloads = NULL;
3,610✔
440
  }
441

442
  tsMLoad = minfo.load;
98,459✔
443

444
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
98,459!
445
    dError("failed to unlock status info lock");
×
446
    return;
×
447
  }
448
}
449

450
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
451
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
452
  if (contLen < 0) {
×
453
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
454
    return;
×
455
  }
456
  void *pHead = rpcMallocCont(contLen);
×
457
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
458
  if (contLen < 0) {
×
459
    rpcFreeCont(pHead);
×
460
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
461
    return;
×
462
  }
463

464
  SRpcMsg rpcMsg = {.pCont = pHead,
×
465
                    .contLen = contLen,
466
                    .msgType = TDMT_MND_NOTIFY,
467
                    .info.ahandle = 0,
468
                    .info.notFreeAhandle = 1,
469
                    .info.refId = 0,
470
                    .info.noResp = 1,
471
                    .info.handle = 0};
472

473
  SEpSet epSet = {0};
×
474
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
475
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
476
    dError("failed to send notify req");
×
477
  }
478
}
479

480
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
481
  dError("auth rsp is received, but not supported yet");
×
482
  return 0;
×
483
}
484

485
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
486
  dError("grant rsp is received, but not supported yet");
×
487
  return 0;
×
488
}
489

490
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
182✔
491
  int32_t       code = 0;
182✔
492
  SDCfgDnodeReq cfgReq = {0};
182✔
493
  SConfig      *pCfg = taosGetCfg();
182✔
494
  SConfigItem  *pItem = NULL;
182✔
495

496
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
182!
497
    return TSDB_CODE_INVALID_MSG;
×
498
  }
499
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
182✔
500
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
5✔
501
  }
502

503
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
177!
504

505
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
177✔
506
  if (code != 0) {
177✔
507
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
2!
508
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
2!
509
      return TSDB_CODE_SUCCESS;
2✔
510
    } else {
511
      return code;
×
512
    }
513
  }
514
  if (pItem == NULL) {
175!
515
    return TSDB_CODE_CFG_NOT_FOUND;
×
516
  }
517

518
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
175!
519
    char value[10] = {0};
×
520
    sscanf(cfgReq.value, "%d", &tsSyncTimeout);
×
521

522
    if (tsSyncTimeout > 0) {
×
523
      SConfigItem *pItemTmp = NULL;
×
524
      char tmp[10] = {0};
×
525

526
      sprintf(tmp, "%d", tsSyncTimeout);
×
527
      TAOS_CHECK_RETURN(
×
528
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
529
      if (pItemTmp == NULL) {
×
530
        return TSDB_CODE_CFG_NOT_FOUND;
×
531
      }
532

533
      sprintf(tmp, "%d", tsSyncTimeout / 4);
×
534
      TAOS_CHECK_RETURN(
×
535
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
536
      if (pItemTmp == NULL) {
×
537
        return TSDB_CODE_CFG_NOT_FOUND;
×
538
      }
539
      TAOS_CHECK_RETURN(
×
540
          cfgGetAndSetItem(pCfg, &pItemTmp, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
541
      if (pItemTmp == NULL) {
×
542
        return TSDB_CODE_CFG_NOT_FOUND;
×
543
      }
544

545
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout/4)/2);
×
546
      TAOS_CHECK_RETURN(
×
547
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
548
      if (pItemTmp == NULL) {
×
549
        return TSDB_CODE_CFG_NOT_FOUND;
×
550
      }
551
      TAOS_CHECK_RETURN(
×
552
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
553
      if (pItemTmp == NULL) {
×
554
        return TSDB_CODE_CFG_NOT_FOUND;
×
555
      }
556
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
557
      if (pItemTmp == NULL) {
×
558
        return TSDB_CODE_CFG_NOT_FOUND;
×
559
      }
560

561
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4);
×
562
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
563
      if (pItemTmp == NULL) {
×
564
        return TSDB_CODE_CFG_NOT_FOUND;
×
565
      }
566

567
      sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout/4)/8);
×
568
      TAOS_CHECK_RETURN(
×
569
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
570
      if (pItemTmp == NULL) {
×
571
        return TSDB_CODE_CFG_NOT_FOUND;
×
572
      }
573
      TAOS_CHECK_RETURN(
×
574
          cfgGetAndSetItem(pCfg, &pItemTmp, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
575
      if (pItemTmp == NULL) {
×
576
        return TSDB_CODE_CFG_NOT_FOUND;
×
577
      }
578
      TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItemTmp, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
×
579
      if (pItemTmp == NULL) {
×
580
        return TSDB_CODE_CFG_NOT_FOUND;
×
581
      }
582

583
      dInfo("change syncTimeout, GetAndSetItem, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
584
            tsSyncTimeout);
585
    }
586
  }
587

588
  if (!isConifgItemLazyMode(pItem)) {
175✔
589
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
174!
590

591
    if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
174!
592
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
×
593
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
×
594
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
×
595

596
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
×
597
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
×
598
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
×
599
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
×
600
      
601
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true));
×
602
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true));
×
603
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true));
×
604

605
      dInfo("change syncTimeout, DynamicOptions, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value,
×
606
            tsSyncTimeout);
607
    }
608
  }
609

610
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
175✔
611
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
44✔
612
    if (code != TSDB_CODE_SUCCESS) {
44!
613
      dError("failed to persist global config since %s", tstrerror(code));
×
614
    }
615
  } else {
616
    code = taosPersistLocalConfig(pMgmt->path);
131✔
617
    if (code != TSDB_CODE_SUCCESS) {
131!
618
      dError("failed to persist local config since %s", tstrerror(code));
×
619
    }
620
  }
621

622
  if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
175!
623
    dInfo(
×
624
        "finished change syncTimeout, option:%s, value:%s", cfgReq.config, cfgReq.value);
625

626
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
627
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
628
  }
629

630
  if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 || taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
175!
631
    (*pMgmt->setVnodeSyncTimeoutFp)();
×
632
  }
633

634
  if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 || taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
175!
635
    (*pMgmt->setMnodeSyncTimeoutFp)();
×
636
  }
637

638
  if (cfgReq.version > 0) {
175✔
639
    tsdmConfigVersion = cfgReq.version;
94✔
640
  }
641
  return code;
175✔
642
}
643

644
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5✔
645
#ifdef TD_ENTERPRISE
646
  int32_t       code = 0;
5✔
647
  SDCfgDnodeReq cfgReq = {0};
5✔
648
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
5!
649
    code = TSDB_CODE_INVALID_MSG;
×
650
    goto _exit;
×
651
  }
652

653
  code = dmUpdateEncryptKey(cfgReq.value, true);
5✔
654
  if (code == 0) {
5!
655
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
5✔
656
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
5✔
657
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
5✔
658
  }
659

660
_exit:
×
661
  pMsg->code = code;
5✔
662
  pMsg->info.rsp = NULL;
5✔
663
  pMsg->info.rspLen = 0;
5✔
664
  return code;
5✔
665
#else
666
  return 0;
667
#endif
668
}
669

670
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
4✔
671
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
4✔
672
  pStatus->details[0] = 0;
4✔
673

674
  SMonMloadInfo minfo = {0};
4✔
675
  (*pMgmt->getMnodeLoadsFp)(&minfo);
4✔
676
  if (minfo.isMnode &&
4!
677
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
4!
678
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
679
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
680
    return;
×
681
  }
682

683
  SMonVloadInfo vinfo = {0};
4✔
684
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
4✔
685
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
12✔
686
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
8✔
687
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
8!
688
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
689
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
690
               syncStr(pLoad->syncState));
×
691
      break;
×
692
    }
693
  }
694

695
  taosArrayDestroy(vinfo.pVloads);
4✔
696
}
697

698
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4✔
699
  int32_t code = 0;
4✔
700
  dDebug("server run status req is received");
4!
701
  SServerStatusRsp statusRsp = {0};
4✔
702
  dmGetServerRunStatus(pMgmt, &statusRsp);
4✔
703

704
  pMsg->info.rsp = NULL;
4✔
705
  pMsg->info.rspLen = 0;
4✔
706

707
  SRpcMsg rspMsg = {.info = pMsg->info};
4✔
708
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
4✔
709
  if (rspLen < 0) {
4!
710
    return TSDB_CODE_OUT_OF_MEMORY;
×
711
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
712
    // return rspMsg.code;
713
  }
714

715
  void *pRsp = rpcMallocCont(rspLen);
4✔
716
  if (pRsp == NULL) {
4!
717
    return terrno;
×
718
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
719
    // return rspMsg.code;
720
  }
721

722
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
4✔
723
  if (rspLen < 0) {
4!
724
    return TSDB_CODE_INVALID_MSG;
×
725
  }
726

727
  pMsg->info.rsp = pRsp;
4✔
728
  pMsg->info.rspLen = rspLen;
4✔
729
  return 0;
4✔
730
}
731

732
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
278✔
733
  int32_t code = 0;
278✔
734

735
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
278!
736
  if (pBlock == NULL) {
278!
737
    return terrno;
×
738
  }
739

740
  size_t size = 0;
278✔
741

742
  const SSysTableMeta *pMeta = NULL;
278✔
743
  getInfosDbMeta(&pMeta, &size);
278✔
744

745
  int32_t index = 0;
278✔
746
  for (int32_t i = 0; i < size; ++i) {
5,560!
747
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
5,560✔
748
      index = i;
278✔
749
      break;
278✔
750
    }
751
  }
752

753
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
278✔
754
  if (pBlock->pDataBlock == NULL) {
278!
755
    code = terrno;
×
756
    goto _exit;
×
757
  }
758

759
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
1,946✔
760
    SColumnInfoData colInfoData = {0};
1,668✔
761
    colInfoData.info.colId = i + 1;
1,668✔
762
    colInfoData.info.type = pMeta[index].schema[i].type;
1,668✔
763
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
1,668✔
764
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
3,336!
765
      code = terrno;
×
766
      goto _exit;
×
767
    }
768
  }
769

770
  pBlock->info.hasVarCol = true;
278✔
771
_exit:
278✔
772
  if (code != 0) {
278!
773
    blockDataDestroy(pBlock);
×
774
  } else {
775
    *ppBlock = pBlock;
278✔
776
  }
777
  return code;
278✔
778
}
779

780
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
278✔
781
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
278✔
782
  if (code != 0) {
278!
783
    return code;
×
784
  }
785

786
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
278✔
787
  if (pColInfo == NULL) {
278!
788
    return TSDB_CODE_OUT_OF_RANGE;
×
789
  }
790

791
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false);
278✔
792
}
793

794
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
278✔
795
  int32_t           size = 0;
278✔
796
  int32_t           rowsRead = 0;
278✔
797
  int32_t           code = 0;
278✔
798
  SRetrieveTableReq retrieveReq = {0};
278✔
799
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
278!
800
    return TSDB_CODE_INVALID_MSG;
×
801
  }
802
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
278!
803
#if 0
804
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
805
    code = TSDB_CODE_MND_NO_RIGHTS;
806
    return code;
807
  }
808
#endif
809
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
278!
810
    return TSDB_CODE_INVALID_MSG;
×
811
  }
812

813
  SSDataBlock *pBlock = NULL;
278✔
814
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
278!
815
    return code;
×
816
  }
817

818
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
278✔
819
  if (code != 0) {
278!
820
    blockDataDestroy(pBlock);
×
821
    return code;
×
822
  }
823

824
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
278✔
825
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
278✔
826
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
278✔
827

828
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
278✔
829
  if (pRsp == NULL) {
278!
830
    code = terrno;
×
831
    dError("failed to retrieve data since %s", tstrerror(code));
×
832
    blockDataDestroy(pBlock);
×
833
    return code;
×
834
  }
835

836
  char *pStart = pRsp->data;
278✔
837
  *(int32_t *)pStart = htonl(numOfCols);
278✔
838
  pStart += sizeof(int32_t);  // number of columns
278✔
839

840
  for (int32_t i = 0; i < numOfCols; ++i) {
1,946✔
841
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
1,668✔
842
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
1,668✔
843

844
    pSchema->bytes = htonl(pColInfo->info.bytes);
1,668✔
845
    pSchema->colId = htons(pColInfo->info.colId);
1,668✔
846
    pSchema->type = pColInfo->info.type;
1,668✔
847

848
    pStart += sizeof(SSysTableSchema);
1,668✔
849
  }
850

851
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
278✔
852
  if (len < 0) {
278!
853
    dError("failed to retrieve data since %s", tstrerror(code));
×
854
    blockDataDestroy(pBlock);
×
855
    rpcFreeCont(pRsp);
×
856
    return terrno;
×
857
  }
858

859
  pRsp->numOfRows = htonl(pBlock->info.rows);
278✔
860
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
278✔
861
  pRsp->completed = 1;
278✔
862
  pMsg->info.rsp = pRsp;
278✔
863
  pMsg->info.rspLen = size;
278✔
864
  dDebug("dnode variables retrieve completed");
278!
865

866
  blockDataDestroy(pBlock);
278✔
867
  return TSDB_CODE_SUCCESS;
278✔
868
}
869

870
int32_t dmProcessStreamHbRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
32,828✔
871
  SMStreamHbRspMsg rsp = {0};
32,828✔
872
  int32_t          code = 0;
32,828✔
873
  SDecoder         decoder;
874
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SStreamMsgGrpHeader));
32,828✔
875
  int32_t          len = pMsg->contLen - sizeof(SStreamMsgGrpHeader);
32,828✔
876
  int64_t          currTs = taosGetTimestampMs();
32,828✔
877

878
  if (pMsg->code) {
32,828✔
879
    return streamHbHandleRspErr(pMsg->code, currTs);
409✔
880
  }
881

882
  tDecoderInit(&decoder, (uint8_t*)msg, len);
32,419✔
883
  code = tDecodeStreamHbRsp(&decoder, &rsp);
32,419✔
884
  if (code < 0) {
32,419!
885
    code = TSDB_CODE_INVALID_MSG;
×
886
    tDeepFreeSMStreamHbRspMsg(&rsp);
×
887
    tDecoderClear(&decoder);
×
888
    dError("fail to decode stream hb rsp msg, error:%s", tstrerror(code));
×
889
    return streamHbHandleRspErr(code, currTs);
×
890
  }
891

892
  tDecoderClear(&decoder);
32,419✔
893

894
  return streamHbProcessRspMsg(&rsp);
32,419✔
895
}
896

897

898
SArray *dmGetMsgHandles() {
2,391✔
899
  int32_t code = -1;
2,391✔
900
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
2,391✔
901
  if (pArray == NULL) {
2,391!
902
    return NULL;
×
903
  }
904

905
  // Requests handled by DNODE
906
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
907
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
908
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
909
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
910
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
911
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
912
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
913
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
914
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
915
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
916
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
917
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
918
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
919
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
920
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, dmPutMsgToStreamMgmtQueue, 0) == NULL) goto _OVER;
2,391!
921

922
  // Requests handled by MNODE
923
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
924
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
925
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,391!
926

927
  code = 0;
2,391✔
928

929
_OVER:
2,391✔
930
  if (code != 0) {
2,391!
931
    taosArrayDestroy(pArray);
×
932
    return NULL;
×
933
  } else {
934
    return pArray;
2,391✔
935
  }
936
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc