• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3631

07 Mar 2025 03:18PM UTC coverage: 60.671% (-3.0%) from 63.629%
#3631

push

travis-ci

web-flow
Merge pull request #30074 from taosdata/ciup30

ci: update ci workflow to fix path issue

141481 of 300084 branches covered (47.15%)

Branch coverage included in aggregate %.

223132 of 300884 relevant lines covered (74.16%)

7878557.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.65
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
#include "monitor.h"
20
#include "systable.h"
21
#include "tanalytics.h"
22
#include "tchecksum.h"
23
#include "tutil.h"
24

25
extern SConfig *tsCfg;
26

27
SMonVloadInfo tsVinfo = {0};
28
SMnodeLoad    tsMLoad = {0};
29
SDnodeData    tsDnodeData = {0};
30

31
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
7,494✔
32
  int32_t code = 0;
7,494✔
33
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
7,494✔
34
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
1,693!
35
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
1,693✔
36
    pMgmt->pData->dnodeId = pCfg->dnodeId;
1,693✔
37
    pMgmt->pData->clusterId = pCfg->clusterId;
1,693✔
38
    monSetDnodeId(pCfg->dnodeId);
1,693✔
39
    auditSetDnodeId(pCfg->dnodeId);
1,693✔
40
    code = dmWriteEps(pMgmt->pData);
1,693✔
41
    if (code != 0) {
1,693✔
42
      dInfo("failed to set local info, dnodeId:%d clusterId:0x%" PRIx64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
1!
43
            tstrerror(code));
44
    }
45
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
1,693✔
46
  }
47
}
7,494✔
48

49
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
7,818✔
50
  int32_t code = 0;
7,818✔
51
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
7,818✔
52
  if (pMgmt->pData->ipWhiteVer == ver) {
7,818✔
53
    if (ver == 0) {
7,812✔
54
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
7,807✔
55
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
7,807!
56
        dError("failed to disable ip white list on dnode");
×
57
      }
58
    }
59
    return;
7,812✔
60
  }
61
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
6✔
62

63
  SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
6✔
64
  int32_t             contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
6✔
65
  if (contLen < 0) {
6!
66
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
67
    return;
×
68
  }
69
  void *pHead = rpcMallocCont(contLen);
6✔
70
  contLen = tSerializeRetrieveIpWhite(pHead, contLen, &req);
6✔
71
  if (contLen < 0) {
6!
72
    rpcFreeCont(pHead);
×
73
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
74
    return;
×
75
  }
76

77
  SRpcMsg rpcMsg = {.pCont = pHead,
6✔
78
                    .contLen = contLen,
79
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITE,
80
                    .info.ahandle = 0,
81
                    .info.notFreeAhandle = 1,
82
                    .info.refId = 0,
83
                    .info.noResp = 0,
84
                    .info.handle = 0};
85
  SEpSet  epset = {0};
6✔
86

87
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
6✔
88

89
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
6✔
90
  if (code != 0) {
6!
91
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
92
  }
93
}
94

95
static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
7,818✔
96
  int32_t code = 0;
7,818✔
97
  int64_t oldVer = taosAnalyGetVersion();
7,818✔
98
  if (oldVer == newVer) return;
7,818✔
99
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
2!
100

101
  SRetrieveAnalAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
2✔
102
  int32_t              contLen = tSerializeRetrieveAnalAlgoReq(NULL, 0, &req);
2✔
103
  if (contLen < 0) {
2!
104
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
105
    return;
×
106
  }
107

108
  void *pHead = rpcMallocCont(contLen);
2✔
109
  contLen = tSerializeRetrieveAnalAlgoReq(pHead, contLen, &req);
2✔
110
  if (contLen < 0) {
2!
111
    rpcFreeCont(pHead);
×
112
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
113
    return;
×
114
  }
115

116
  SRpcMsg rpcMsg = {
2✔
117
      .pCont = pHead,
118
      .contLen = contLen,
119
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
120
      .info.ahandle = 0,
121
      .info.refId = 0,
122
      .info.noResp = 0,
123
      .info.handle = 0,
124
  };
125
  SEpSet epset = {0};
2✔
126

127
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
2✔
128

129
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
2✔
130
  if (code != 0) {
2!
131
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
132
  }
133
}
134

135
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
85,810✔
136
  const STraceId *trace = &pRsp->info.traceId;
85,810✔
137
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
85,810!
138

139
  if (pRsp->code != 0) {
85,810!
140
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
141
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
142
             pMgmt->statusSeq);
143
      pMgmt->pData->dropped = 1;
×
144
      if (dmWriteEps(pMgmt->pData) != 0) {
×
145
        dError("failed to write dnode file");
×
146
      }
147
      dInfo("dnode will exit since it is in the dropped state");
×
148
      (void)raise(SIGINT);
×
149
    }
150
  } else {
151
    SStatusRsp statusRsp = {0};
85,810✔
152
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
93,628!
153
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
7,818✔
154
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
7,818✔
155
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
7,494!
156
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
157
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
7,494✔
158
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
7,494✔
159
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
7,494✔
160
      }
161
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
7,818✔
162
      dmMayShouldUpdateAnalFunc(pMgmt, statusRsp.analVer);
7,818✔
163
    }
164
    tFreeSStatusRsp(&statusRsp);
85,810✔
165
  }
166
  rpcFreeCont(pRsp->pCont);
85,810✔
167
}
85,810✔
168

169
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
86,306✔
170
  int32_t    code = 0;
86,306✔
171
  SStatusReq req = {0};
86,306✔
172
  req.timestamp = taosGetTimestampMs();
86,306✔
173
  pMgmt->statusSeq++;
86,306✔
174

175
  dDebug("send status req to mnode, statusSeq:%d, begin to mgnt statusInfolock", pMgmt->statusSeq);
86,306✔
176
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
86,306!
177
    dError("failed to lock status info lock");
×
178
    return;
496✔
179
  }
180

181
  dDebug("send status req to mnode, statusSeq:%d, begin to get dnode info", pMgmt->statusSeq);
86,306✔
182
  req.sver = tsVersion;
86,306✔
183
  req.dnodeVer = tsDnodeData.dnodeVer;
86,306✔
184
  req.dnodeId = tsDnodeData.dnodeId;
86,306✔
185
  req.clusterId = tsDnodeData.clusterId;
86,306✔
186
  if (req.clusterId == 0) req.dnodeId = 0;
86,306✔
187
  req.rebootTime = tsDnodeData.rebootTime;
86,306✔
188
  req.updateTime = tsDnodeData.updateTime;
86,306✔
189
  req.numOfCores = tsNumOfCores;
86,306✔
190
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
86,306✔
191
  req.numOfDiskCfg = tsDiskCfgNum;
86,306✔
192
  req.memTotal = tsTotalMemoryKB * 1024;
86,306✔
193
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
86,306✔
194
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
86,306✔
195
  tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
86,306✔
196

197
  req.clusterCfg.statusInterval = tsStatusInterval;
86,306✔
198
  req.clusterCfg.checkTime = 0;
86,306✔
199
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
86,306✔
200
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
86,306✔
201
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
86,306✔
202
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
86,306✔
203
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
86,306✔
204
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
86,306✔
205
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
86,306✔
206
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
86,306✔
207
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
86,306✔
208
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
86,306✔
209
  char timestr[32] = "1970-01-01 00:00:00.00";
86,306✔
210
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
86,306!
211
      0) {
212
    dError("failed to parse time since %s", tstrerror(code));
×
213
  }
214
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
86,306✔
215
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
86,306✔
216
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
86,306✔
217

218
  dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
86,306✔
219

220
  req.pVloads = tsVinfo.pVloads;
86,306✔
221
  tsVinfo.pVloads = NULL;
86,306✔
222

223
  dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
86,306✔
224
  req.mload = tsMLoad;
86,306✔
225

226
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
86,306!
227
    dError("failed to unlock status info lock");
×
228
    return;
×
229
  }
230

231
  dDebug("send status req to mnode, statusSeq:%d, begin to get qnode loads", pMgmt->statusSeq);
86,306✔
232
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
86,306✔
233

234
  req.statusSeq = pMgmt->statusSeq;
86,306✔
235
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
86,306✔
236
  req.analVer = taosAnalyGetVersion();
86,306✔
237

238
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
86,306✔
239
  if (contLen < 0) {
86,306!
240
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
241
    return;
×
242
  }
243

244
  void *pHead = rpcMallocCont(contLen);
86,306✔
245
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
86,306✔
246
  if (contLen < 0) {
86,306!
247
    rpcFreeCont(pHead);
×
248
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
249
    return;
×
250
  }
251
  tFreeSStatusReq(&req);
86,306✔
252

253
  SRpcMsg rpcMsg = {.pCont = pHead,
86,306✔
254
                    .contLen = contLen,
255
                    .msgType = TDMT_MND_STATUS,
256
                    .info.ahandle = 0,
257
                    .info.notFreeAhandle = 1,
258
                    .info.refId = 0,
259
                    .info.noResp = 0,
260
                    .info.handle = 0};
261
  SRpcMsg rpcRsp = {0};
86,306✔
262

263
  dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
86,306✔
264

265
  SEpSet epSet = {0};
86,306✔
266
  int8_t epUpdated = 0;
86,306✔
267
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
86,306✔
268

269
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
86,306✔
270
  code =
271
      rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
86,306✔
272
  if (code != 0) {
86,306✔
273
    dError("failed to SendRecv with timeout %d status req since %s", tsStatusInterval * 5 * 1000, tstrerror(code));
496!
274
    return;
496✔
275
  }
276

277
  if (rpcRsp.code != 0) {
85,810!
278
    dmRotateMnodeEpSet(pMgmt->pData);
×
279
    char tbuf[512];
280
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
×
281
    dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
×
282
          tbuf, epSet.inUse);
283
  } else {
284
    if (epUpdated == 1) {
85,810✔
285
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
624✔
286
    }
287
  }
288
  dmProcessStatusRsp(pMgmt, &rpcRsp);
85,810✔
289
}
290

291
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
2,040✔
292
  const STraceId *trace = &pRsp->info.traceId;
2,040✔
293
  int32_t         code = 0;
2,040✔
294
  SConfigRsp      configRsp = {0};
2,040✔
295
  bool            needStop = false;
2,040✔
296

297
  if (pRsp->code != 0) {
2,040!
298
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
299
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
300
      pMgmt->pData->dropped = 1;
×
301
      if (dmWriteEps(pMgmt->pData) != 0) {
×
302
        dError("failed to write dnode file");
×
303
      }
304
      dInfo("dnode will exit since it is in the dropped state");
×
305
      (void)raise(SIGINT);
×
306
    }
307
  } else {
308
    bool needUpdate = false;
2,040✔
309
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
4,080!
310
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
2,040✔
311
      // Try to use cfg file in current dnode.
312
      if (configRsp.forceReadConfig) {
2,040!
313
        if (configRsp.isConifgVerified) {
×
314
          uInfo("force read config and check config verified");
×
315
          code = taosPersistGlobalConfig(taosGetGlobalCfg(tsCfg), pMgmt->path, configRsp.cver);
×
316
          if (code != TSDB_CODE_SUCCESS) {
×
317
            dError("failed to persist global config since %s", tstrerror(code));
×
318
            goto _exit;
×
319
          }
320
          needUpdate = true;
×
321
        } else {
322
          // log the difference configurations
323
          printConfigNotMatch(configRsp.array);
×
324
          needStop = true;
×
325
          goto _exit;
×
326
        }
327
      }
328
      // Try to use cfg from mnode sdb.
329
      if (!configRsp.isVersionVerified) {
2,040✔
330
        uInfo("config version not verified, update config");
1,694!
331
        needUpdate = true;
1,694✔
332
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
1,694✔
333
        if (code != TSDB_CODE_SUCCESS) {
1,694!
334
          dError("failed to persist global config since %s", tstrerror(code));
×
335
          goto _exit;
×
336
        }
337
      }
338
    }
339
    if (needUpdate) {
2,040✔
340
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
1,694✔
341
      if (code != TSDB_CODE_SUCCESS) {
1,694!
342
        dError("failed to update config since %s", tstrerror(code));
×
343
        goto _exit;
×
344
      }
345
      code = setAllConfigs(tsCfg);
1,694✔
346
      if (code != TSDB_CODE_SUCCESS) {
1,694!
347
        dError("failed to set all configs since %s", tstrerror(code));
×
348
        goto _exit;
×
349
      }
350
    }
351
    code = taosPersistLocalConfig(pMgmt->path);
2,040✔
352
    if (code != TSDB_CODE_SUCCESS) {
2,040!
353
      dError("failed to persist local config since %s", tstrerror(code));
×
354
    }
355
    tsConfigInited = 1;
2,040✔
356
  }
357
_exit:
2,040✔
358
  tFreeSConfigRsp(&configRsp);
2,040✔
359
  rpcFreeCont(pRsp->pCont);
2,040✔
360
  if (needStop) {
2,040!
361
    dmStop();
×
362
  }
363
}
2,040✔
364

365
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
2,116✔
366
  int32_t    code = 0;
2,116✔
367
  SConfigReq req = {0};
2,116✔
368

369
  req.cver = tsdmConfigVersion;
2,116✔
370
  req.forceReadConfig = tsForceReadConfig;
2,116✔
371
  req.array = taosGetGlobalCfg(tsCfg);
2,116✔
372
  dDebug("send config req to mnode, configVersion:%d", req.cver);
2,116✔
373

374
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
2,116✔
375
  if (contLen < 0) {
2,116!
376
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
377
    return;
76✔
378
  }
379

380
  void *pHead = rpcMallocCont(contLen);
2,116✔
381
  if (pHead == NULL) {
2,116!
382
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
383
    return;
×
384
  }
385
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
2,116✔
386
  if (contLen < 0) {
2,116!
387
    rpcFreeCont(pHead);
×
388
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
389
    return;
×
390
  }
391

392
  SRpcMsg rpcMsg = {.pCont = pHead,
2,116✔
393
                    .contLen = contLen,
394
                    .msgType = TDMT_MND_CONFIG,
395
                    .info.ahandle = 0,
396
                    .info.notFreeAhandle = 1,
397
                    .info.refId = 0,
398
                    .info.noResp = 0,
399
                    .info.handle = 0};
400
  SRpcMsg rpcRsp = {0};
2,116✔
401

402
  SEpSet epSet = {0};
2,116✔
403
  int8_t epUpdated = 0;
2,116✔
404
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
2,116✔
405

406
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
2,116✔
407
  code =
408
      rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
2,116✔
409
  if (code != 0) {
2,116✔
410
    dError("failed to SendRecv config req with timeout %d since %s", tsStatusInterval * 5 * 1000, tstrerror(code));
76!
411
    return;
76✔
412
  }
413
  if (rpcRsp.code != 0) {
2,040!
414
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
415
    return;
×
416
  }
417
  dmProcessConfigRsp(pMgmt, &rpcRsp);
2,040✔
418
}
419

420
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
88,742✔
421
  dDebug("begin to get dnode info");
88,742✔
422
  SDnodeData dnodeData = {0};
88,742✔
423
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
88,742✔
424
  dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
88,742✔
425
  dnodeData.dnodeId = pMgmt->pData->dnodeId;
88,742✔
426
  dnodeData.clusterId = pMgmt->pData->clusterId;
88,742✔
427
  dnodeData.rebootTime = pMgmt->pData->rebootTime;
88,742✔
428
  dnodeData.updateTime = pMgmt->pData->updateTime;
88,742✔
429
  tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
88,742✔
430
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
88,742✔
431

432
  dDebug("begin to get vnode loads");
88,742✔
433
  SMonVloadInfo vinfo = {0};
88,742✔
434
  (*pMgmt->getVnodeLoadsFp)(&vinfo);  // dmGetVnodeLoads
88,742✔
435

436
  dDebug("begin to get mnode loads");
88,742✔
437
  SMonMloadInfo minfo = {0};
88,742✔
438
  (*pMgmt->getMnodeLoadsFp)(&minfo);  // dmGetMnodeLoads
88,742✔
439

440
  dDebug("begin to lock status info");
88,742✔
441
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
88,742!
442
    dError("failed to lock status info lock");
×
443
    return;
×
444
  }
445
  tsDnodeData.dnodeVer = dnodeData.dnodeVer;
88,742✔
446
  tsDnodeData.dnodeId = dnodeData.dnodeId;
88,742✔
447
  tsDnodeData.clusterId = dnodeData.clusterId;
88,742✔
448
  tsDnodeData.rebootTime = dnodeData.rebootTime;
88,742✔
449
  tsDnodeData.updateTime = dnodeData.updateTime;
88,742✔
450
  tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
88,742✔
451

452
  if (tsVinfo.pVloads == NULL) {
88,742✔
453
    tsVinfo.pVloads = vinfo.pVloads;
84,720✔
454
    vinfo.pVloads = NULL;
84,720✔
455
  } else {
456
    taosArrayDestroy(vinfo.pVloads);
4,022✔
457
    vinfo.pVloads = NULL;
4,022✔
458
  }
459

460
  tsMLoad = minfo.load;
88,742✔
461

462
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
88,742!
463
    dError("failed to unlock status info lock");
×
464
    return;
×
465
  }
466
}
467

468
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
469
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
470
  if (contLen < 0) {
×
471
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
472
    return;
×
473
  }
474
  void *pHead = rpcMallocCont(contLen);
×
475
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
476
  if (contLen < 0) {
×
477
    rpcFreeCont(pHead);
×
478
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
479
    return;
×
480
  }
481

482
  SRpcMsg rpcMsg = {.pCont = pHead,
×
483
                    .contLen = contLen,
484
                    .msgType = TDMT_MND_NOTIFY,
485
                    .info.ahandle = 0,
486
                    .info.notFreeAhandle = 1,
487
                    .info.refId = 0,
488
                    .info.noResp = 1,
489
                    .info.handle = 0};
490

491
  SEpSet epSet = {0};
×
492
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
493
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
494
    dError("failed to send notify req");
×
495
  }
496
}
497

498
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
499
  dError("auth rsp is received, but not supported yet");
×
500
  return 0;
×
501
}
502

503
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
504
  dError("grant rsp is received, but not supported yet");
×
505
  return 0;
×
506
}
507

508
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
514✔
509
  int32_t       code = 0;
514✔
510
  SDCfgDnodeReq cfgReq = {0};
514✔
511
  SConfig      *pCfg = taosGetCfg();
514✔
512
  SConfigItem  *pItem = NULL;
514✔
513

514
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
514!
515
    return TSDB_CODE_INVALID_MSG;
×
516
  }
517
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
514!
518
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
×
519
  }
520

521
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
514!
522

523
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
514✔
524
  if (code != 0) {
514✔
525
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
1!
526
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
1!
527
      return TSDB_CODE_SUCCESS;
1✔
528
    } else {
529
      return code;
×
530
    }
531
  }
532
  if (pItem == NULL) {
513!
533
    return TSDB_CODE_CFG_NOT_FOUND;
×
534
  }
535
  if (!isConifgItemLazyMode(pItem)) {
513✔
536
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
399!
537
  }
538

539
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
513✔
540
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
402✔
541
    if (code != TSDB_CODE_SUCCESS) {
402!
542
      dError("failed to persist global config since %s", tstrerror(code));
×
543
    }
544
  } else {
545
    code = taosPersistLocalConfig(pMgmt->path);
111✔
546
    if (code != TSDB_CODE_SUCCESS) {
111!
547
      dError("failed to persist local config since %s", tstrerror(code));
×
548
    }
549
  }
550
  if (cfgReq.version > 0) {
513✔
551
    tsdmConfigVersion = cfgReq.version;
506✔
552
  }
553
  return code;
513✔
554
}
555

556
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
557
#ifdef TD_ENTERPRISE
558
  int32_t       code = 0;
×
559
  SDCfgDnodeReq cfgReq = {0};
×
560
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
×
561
    code = TSDB_CODE_INVALID_MSG;
×
562
    goto _exit;
×
563
  }
564

565
  code = dmUpdateEncryptKey(cfgReq.value, true);
×
566
  if (code == 0) {
×
567
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
×
568
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
×
569
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
×
570
  }
571

572
_exit:
×
573
  pMsg->code = code;
×
574
  pMsg->info.rsp = NULL;
×
575
  pMsg->info.rspLen = 0;
×
576
  return code;
×
577
#else
578
  return 0;
579
#endif
580
}
581

582
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
3✔
583
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
3✔
584
  pStatus->details[0] = 0;
3✔
585

586
  SMonMloadInfo minfo = {0};
3✔
587
  (*pMgmt->getMnodeLoadsFp)(&minfo);
3✔
588
  if (minfo.isMnode &&
3!
589
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
3!
590
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
591
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
592
    return;
×
593
  }
594

595
  SMonVloadInfo vinfo = {0};
3✔
596
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
3✔
597
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
9✔
598
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
6✔
599
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
6!
600
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
601
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
602
               syncStr(pLoad->syncState));
×
603
      break;
×
604
    }
605
  }
606

607
  taosArrayDestroy(vinfo.pVloads);
3✔
608
}
609

610
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3✔
611
  int32_t code = 0;
3✔
612
  dDebug("server run status req is received");
3!
613
  SServerStatusRsp statusRsp = {0};
3✔
614
  dmGetServerRunStatus(pMgmt, &statusRsp);
3✔
615

616
  pMsg->info.rsp = NULL;
3✔
617
  pMsg->info.rspLen = 0;
3✔
618

619
  SRpcMsg rspMsg = {.info = pMsg->info};
3✔
620
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
3✔
621
  if (rspLen < 0) {
3!
622
    return TSDB_CODE_OUT_OF_MEMORY;
×
623
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
624
    // return rspMsg.code;
625
  }
626

627
  void *pRsp = rpcMallocCont(rspLen);
3✔
628
  if (pRsp == NULL) {
3!
629
    return terrno;
×
630
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
631
    // return rspMsg.code;
632
  }
633

634
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
3✔
635
  if (rspLen < 0) {
3!
636
    return TSDB_CODE_INVALID_MSG;
×
637
  }
638

639
  pMsg->info.rsp = pRsp;
3✔
640
  pMsg->info.rspLen = rspLen;
3✔
641
  return 0;
3✔
642
}
643

644
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
490✔
645
  int32_t code = 0;
490✔
646

647
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
490!
648
  if (pBlock == NULL) {
490!
649
    return terrno;
×
650
  }
651

652
  size_t size = 0;
490✔
653

654
  const SSysTableMeta *pMeta = NULL;
490✔
655
  getInfosDbMeta(&pMeta, &size);
490✔
656

657
  int32_t index = 0;
490✔
658
  for (int32_t i = 0; i < size; ++i) {
8,820!
659
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
8,820✔
660
      index = i;
490✔
661
      break;
490✔
662
    }
663
  }
664

665
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
490✔
666
  if (pBlock->pDataBlock == NULL) {
490!
667
    code = terrno;
×
668
    goto _exit;
×
669
  }
670

671
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
3,430✔
672
    SColumnInfoData colInfoData = {0};
2,940✔
673
    colInfoData.info.colId = i + 1;
2,940✔
674
    colInfoData.info.type = pMeta[index].schema[i].type;
2,940✔
675
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
2,940✔
676
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
5,880!
677
      code = terrno;
×
678
      goto _exit;
×
679
    }
680
  }
681

682
  pBlock->info.hasVarCol = true;
490✔
683
_exit:
490✔
684
  if (code != 0) {
490!
685
    blockDataDestroy(pBlock);
×
686
  } else {
687
    *ppBlock = pBlock;
490✔
688
  }
689
  return code;
490✔
690
}
691

692
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
490✔
693
  int32_t code = dumpConfToDataBlock(pBlock, 1, NULL);
490✔
694
  if (code != 0) {
490!
695
    return code;
×
696
  }
697

698
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
490✔
699
  if (pColInfo == NULL) {
490!
700
    return TSDB_CODE_OUT_OF_RANGE;
×
701
  }
702

703
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false);
490✔
704
}
705

706
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
490✔
707
  int32_t           size = 0;
490✔
708
  int32_t           rowsRead = 0;
490✔
709
  int32_t           code = 0;
490✔
710
  SRetrieveTableReq retrieveReq = {0};
490✔
711
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
490!
712
    return TSDB_CODE_INVALID_MSG;
×
713
  }
714
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
490!
715
#if 0
716
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
717
    code = TSDB_CODE_MND_NO_RIGHTS;
718
    return code;
719
  }
720
#endif
721
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
490!
722
    return TSDB_CODE_INVALID_MSG;
×
723
  }
724

725
  SSDataBlock *pBlock = NULL;
490✔
726
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
490!
727
    return code;
×
728
  }
729

730
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
490✔
731
  if (code != 0) {
490!
732
    blockDataDestroy(pBlock);
×
733
    return code;
×
734
  }
735

736
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
490✔
737
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
490✔
738
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
490✔
739

740
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
490✔
741
  if (pRsp == NULL) {
490!
742
    code = terrno;
×
743
    dError("failed to retrieve data since %s", tstrerror(code));
×
744
    blockDataDestroy(pBlock);
×
745
    return code;
×
746
  }
747

748
  char *pStart = pRsp->data;
490✔
749
  *(int32_t *)pStart = htonl(numOfCols);
490✔
750
  pStart += sizeof(int32_t);  // number of columns
490✔
751

752
  for (int32_t i = 0; i < numOfCols; ++i) {
3,430✔
753
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
2,940✔
754
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
2,940✔
755

756
    pSchema->bytes = htonl(pColInfo->info.bytes);
2,940✔
757
    pSchema->colId = htons(pColInfo->info.colId);
2,940✔
758
    pSchema->type = pColInfo->info.type;
2,940✔
759

760
    pStart += sizeof(SSysTableSchema);
2,940✔
761
  }
762

763
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
490✔
764
  if (len < 0) {
490!
765
    dError("failed to retrieve data since %s", tstrerror(code));
×
766
    blockDataDestroy(pBlock);
×
767
    rpcFreeCont(pRsp);
×
768
    return terrno;
×
769
  }
770

771
  pRsp->numOfRows = htonl(pBlock->info.rows);
490✔
772
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
490✔
773
  pRsp->completed = 1;
490✔
774
  pMsg->info.rsp = pRsp;
490✔
775
  pMsg->info.rspLen = size;
490✔
776
  dDebug("dnode variables retrieve completed");
490!
777

778
  blockDataDestroy(pBlock);
490✔
779
  return TSDB_CODE_SUCCESS;
490✔
780
}
781

782
SArray *dmGetMsgHandles() {
2,138✔
783
  int32_t code = -1;
2,138✔
784
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
2,138✔
785
  if (pArray == NULL) {
2,138!
786
    return NULL;
×
787
  }
788

789
  // Requests handled by DNODE
790
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
791
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
792
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
793
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
794
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
795
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
796
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
797
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
798
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
799
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
800
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
801

802
  // Requests handled by MNODE
803
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
804
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
805
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,138!
806

807
  code = 0;
2,138✔
808

809
_OVER:
2,138✔
810
  if (code != 0) {
2,138!
811
    taosArrayDestroy(pArray);
×
812
    return NULL;
×
813
  } else {
814
    return pArray;
2,138✔
815
  }
816
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc