• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.54
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
#include "monitor.h"
20
#include "systable.h"
21
#include "tanalytics.h"
22
#include "tchecksum.h"
23
#include "tutil.h"
24

25
extern SConfig *tsCfg;
26

27
SMonVloadInfo tsVinfo = {0};
28

29
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
114✔
30
  int32_t code = 0;
114✔
31
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
114✔
32
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
33!
33
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
33✔
34
    pMgmt->pData->dnodeId = pCfg->dnodeId;
33✔
35
    pMgmt->pData->clusterId = pCfg->clusterId;
33✔
36
    monSetDnodeId(pCfg->dnodeId);
33✔
37
    auditSetDnodeId(pCfg->dnodeId);
33✔
38
    code = dmWriteEps(pMgmt->pData);
33✔
39
    if (code != 0) {
33!
40
      dInfo("failed to set local info, dnodeId:%d clusterId:%" PRId64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
×
41
            tstrerror(code));
42
    }
43
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
33✔
44
  }
45
}
114✔
46

47
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
114✔
48
  int32_t code = 0;
114✔
49
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
114✔
50
  if (pMgmt->pData->ipWhiteVer == ver) {
114!
51
    if (ver == 0) {
114!
52
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
114✔
53
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
114!
54
        dError("failed to disable ip white list on dnode");
×
55
      }
56
    }
57
    return;
114✔
58
  }
UNCOV
59
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
×
60

UNCOV
61
  SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
×
UNCOV
62
  int32_t             contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
×
UNCOV
63
  if (contLen < 0) {
×
64
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
65
    return;
×
66
  }
UNCOV
67
  void *pHead = rpcMallocCont(contLen);
×
UNCOV
68
  contLen = tSerializeRetrieveIpWhite(pHead, contLen, &req);
×
UNCOV
69
  if (contLen < 0) {
×
70
    rpcFreeCont(pHead);
×
71
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
72
    return;
×
73
  }
74

UNCOV
75
  SRpcMsg rpcMsg = {.pCont = pHead,
×
76
                    .contLen = contLen,
77
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITE,
78
                    .info.ahandle = 0,
79
                    .info.notFreeAhandle = 1,
80
                    .info.refId = 0,
81
                    .info.noResp = 0,
82
                    .info.handle = 0};
UNCOV
83
  SEpSet  epset = {0};
×
84

UNCOV
85
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
86

UNCOV
87
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
UNCOV
88
  if (code != 0) {
×
89
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
90
  }
91
}
92

93
static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
114✔
94
  int32_t code = 0;
114✔
95
  int64_t oldVer = taosAnalGetVersion();
114✔
96
  if (oldVer == newVer) return;
114!
97
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
98

99
  SRetrieveAnalAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
100
  int32_t              contLen = tSerializeRetrieveAnalAlgoReq(NULL, 0, &req);
×
101
  if (contLen < 0) {
×
102
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
103
    return;
×
104
  }
105

106
  void *pHead = rpcMallocCont(contLen);
×
107
  contLen = tSerializeRetrieveAnalAlgoReq(pHead, contLen, &req);
×
108
  if (contLen < 0) {
×
109
    rpcFreeCont(pHead);
×
110
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
111
    return;
×
112
  }
113

114
  SRpcMsg rpcMsg = {
×
115
      .pCont = pHead,
116
      .contLen = contLen,
117
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
118
      .info.ahandle = 0,
119
      .info.refId = 0,
120
      .info.noResp = 0,
121
      .info.handle = 0,
122
  };
123
  SEpSet epset = {0};
×
124

125
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
126

127
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
128
  if (code != 0) {
×
129
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
130
  }
131
}
132

133
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
7,087✔
134
  const STraceId *trace = &pRsp->info.traceId;
7,087✔
135
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
7,087!
136

137
  if (pRsp->code != 0) {
7,087!
138
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
139
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
140
             pMgmt->statusSeq);
141
      pMgmt->pData->dropped = 1;
×
142
      if (dmWriteEps(pMgmt->pData) != 0) {
×
143
        dError("failed to write dnode file");
×
144
      }
145
      dInfo("dnode will exit since it is in the dropped state");
×
146
      (void)raise(SIGINT);
×
147
    }
148
  } else {
149
    SStatusRsp statusRsp = {0};
7,087✔
150
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
7,201!
151
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
114✔
152
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
114!
153
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
114!
154
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
155
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
114✔
156
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
114✔
157
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
114✔
158
      }
159
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
114✔
160
      dmMayShouldUpdateAnalFunc(pMgmt, statusRsp.analVer);
114✔
161
    }
162
    tFreeSStatusRsp(&statusRsp);
7,087✔
163
  }
164
  rpcFreeCont(pRsp->pCont);
7,087✔
165
}
7,087✔
166

167
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
7,117✔
168
  int32_t    code = 0;
7,117✔
169
  SStatusReq req = {0};
7,117✔
170

171
  dDebug("send status req to mnode, statusSeq:%d, begin to mgnt lock", pMgmt->statusSeq);
7,117✔
172
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
7,117✔
173
  req.sver = tsVersion;
7,117✔
174
  req.dnodeVer = pMgmt->pData->dnodeVer;
7,117✔
175
  req.dnodeId = pMgmt->pData->dnodeId;
7,117✔
176
  req.clusterId = pMgmt->pData->clusterId;
7,117✔
177
  if (req.clusterId == 0) req.dnodeId = 0;
7,117✔
178
  req.rebootTime = pMgmt->pData->rebootTime;
7,117✔
179
  req.updateTime = pMgmt->pData->updateTime;
7,117✔
180
  req.numOfCores = tsNumOfCores;
7,117✔
181
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
7,117✔
182
  req.numOfDiskCfg = tsDiskCfgNum;
7,117✔
183
  req.memTotal = tsTotalMemoryKB * 1024;
7,117✔
184
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - 16 * 1024 * 1024;
7,117✔
185
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
7,117✔
186
  tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
7,117✔
187

188
  req.clusterCfg.statusInterval = tsStatusInterval;
7,117✔
189
  req.clusterCfg.checkTime = 0;
7,117✔
190
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
7,117✔
191
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
7,117✔
192
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
7,117✔
193
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
7,117✔
194
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
7,117✔
195
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
7,117✔
196
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
7,117✔
197
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
7,117✔
198
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
7,117✔
199
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
7,117✔
200
  char timestr[32] = "1970-01-01 00:00:00.00";
7,117✔
201
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
7,117!
202
      0) {
203
    dError("failed to parse time since %s", tstrerror(code));
×
204
  }
205
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
7,117✔
206
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
7,117✔
207
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
7,117✔
208
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
7,117✔
209

210
  dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
7,117✔
211
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
7,117!
212
    dError("failed to lock status info lock");
×
213
    return;
30✔
214
  }
215
  req.pVloads = tsVinfo.pVloads;
7,117✔
216
  tsVinfo.pVloads = NULL;
7,117✔
217
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
7,117!
218
    dError("failed to unlock status info lock");
×
219
    return;
×
220
  }
221

222
  dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
7,117✔
223
  SMonMloadInfo minfo = {0};
7,117✔
224
  (*pMgmt->getMnodeLoadsFp)(&minfo);
7,117✔
225
  req.mload = minfo.load;
7,117✔
226

227
  dDebug("send status req to mnode, statusSeq:%d, begin to get qnode loads", pMgmt->statusSeq);
7,117✔
228
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
7,117✔
229

230
  pMgmt->statusSeq++;
7,117✔
231
  req.statusSeq = pMgmt->statusSeq;
7,117✔
232
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
7,117✔
233
  req.analVer = taosAnalGetVersion();
7,117✔
234

235
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
7,117✔
236
  if (contLen < 0) {
7,117!
237
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
238
    return;
×
239
  }
240

241
  void *pHead = rpcMallocCont(contLen);
7,117✔
242
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
7,117✔
243
  if (contLen < 0) {
7,117!
244
    rpcFreeCont(pHead);
×
245
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
246
    return;
×
247
  }
248
  tFreeSStatusReq(&req);
7,117✔
249

250
  SRpcMsg rpcMsg = {.pCont = pHead,
7,117✔
251
                    .contLen = contLen,
252
                    .msgType = TDMT_MND_STATUS,
253
                    .info.ahandle = 0,
254
                    .info.notFreeAhandle = 1,
255
                    .info.refId = 0,
256
                    .info.noResp = 0,
257
                    .info.handle = 0};
258
  SRpcMsg rpcRsp = {0};
7,117✔
259

260
  dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
7,117✔
261

262
  SEpSet epSet = {0};
7,117✔
263
  int8_t epUpdated = 0;
7,117✔
264
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
7,117✔
265

266
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
7,117✔
267
  code =
268
      rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
7,117✔
269
  if (code != 0) {
7,117✔
270
    dError("failed to send status req since %s", tstrerror(code));
30!
271
    return;
30✔
272
  }
273

274
  if (rpcRsp.code != 0) {
7,087!
275
    dmRotateMnodeEpSet(pMgmt->pData);
×
276
    char tbuf[512];
277
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
×
278
    dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse);
×
279
  } else {
280
    if (epUpdated == 1) {
7,087!
UNCOV
281
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
×
282
    }
283
  }
284
  dmProcessStatusRsp(pMgmt, &rpcRsp);
7,087✔
285
}
286

287
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
48✔
288
  const STraceId *trace = &pRsp->info.traceId;
48✔
289
  int32_t         code = 0;
48✔
290
  SConfigRsp      configRsp = {0};
48✔
291
  bool            needStop = false;
48✔
292

293
  if (pRsp->code != 0) {
48!
294
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
295
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
296
      pMgmt->pData->dropped = 1;
×
297
      if (dmWriteEps(pMgmt->pData) != 0) {
×
298
        dError("failed to write dnode file");
×
299
      }
300
      dInfo("dnode will exit since it is in the dropped state");
×
301
      (void)raise(SIGINT);
×
302
    }
303
  } else {
304
    bool needUpdate = false;
48✔
305
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
96!
306
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
48✔
307
      // Try to use cfg file in current dnode.
308
      if (configRsp.forceReadConfig) {
48!
309
        if (configRsp.isConifgVerified) {
×
310
          uInfo("force read config and check config verified");
×
311
          code = taosPersistGlobalConfig(taosGetGlobalCfg(tsCfg), pMgmt->path, configRsp.cver);
×
312
          if (code != TSDB_CODE_SUCCESS) {
×
313
            dError("failed to persist global config since %s", tstrerror(code));
×
314
            goto _exit;
×
315
          }
316
          needUpdate = true;
×
317
        } else {
318
          // log the difference configurations
319
          printConfigNotMatch(configRsp.array);
×
320
          needStop = true;
×
321
          goto _exit;
×
322
        }
323
      }
324
      // Try to use cfg from mnode sdb.
325
      if (!configRsp.isVersionVerified) {
48✔
326
        uInfo("config version not verified, update config");
41!
327
        needUpdate = true;
41✔
328
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
41✔
329
        if (code != TSDB_CODE_SUCCESS) {
41✔
330
          dError("failed to persist global config since %s", tstrerror(code));
8!
331
          goto _exit;
8✔
332
        }
333
      }
334
    }
335
    if (needUpdate) {
40✔
336
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
33✔
337
      if (code != TSDB_CODE_SUCCESS) {
33!
338
        dError("failed to update config since %s", tstrerror(code));
×
339
        goto _exit;
×
340
      }
341
      code = setAllConfigs(tsCfg);
33✔
342
      if (code != TSDB_CODE_SUCCESS) {
33✔
343
        dError("failed to set all configs since %s", tstrerror(code));
1!
344
        goto _exit;
1✔
345
      }
346
    }
347
    code = taosPersistLocalConfig(pMgmt->path);
39✔
348
    if (code != TSDB_CODE_SUCCESS) {
39!
349
      dError("failed to persist local config since %s", tstrerror(code));
×
350
    }
351
    tsConfigInited = 1;
39✔
352
  }
353
_exit:
48✔
354
  tFreeSConfigRsp(&configRsp);
48✔
355
  rpcFreeCont(pRsp->pCont);
48✔
356
  if (needStop) {
48!
357
    dmStop();
×
358
  }
359
}
48✔
360

361
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
48✔
362
  int32_t    code = 0;
48✔
363
  SConfigReq req = {0};
48✔
364

365
  req.cver = tsdmConfigVersion;
48✔
366
  req.forceReadConfig = tsForceReadConfig;
48✔
367
  req.array = taosGetGlobalCfg(tsCfg);
48✔
368
  dDebug("send config req to mnode, configVersion:%d", req.cver);
48✔
369

370
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
48✔
371
  if (contLen < 0) {
48!
372
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
UNCOV
373
    return;
×
374
  }
375

376
  void *pHead = rpcMallocCont(contLen);
48✔
377
  if (pHead == NULL) {
48!
378
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
379
    return;
×
380
  }
381
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
48✔
382
  if (contLen < 0) {
48!
383
    rpcFreeCont(pHead);
×
384
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
385
    return;
×
386
  }
387

388
  SRpcMsg rpcMsg = {.pCont = pHead,
48✔
389
                    .contLen = contLen,
390
                    .msgType = TDMT_MND_CONFIG,
391
                    .info.ahandle = 0,
392
                    .info.notFreeAhandle = 1,
393
                    .info.refId = 0,
394
                    .info.noResp = 0,
395
                    .info.handle = 0};
396
  SRpcMsg rpcRsp = {0};
48✔
397

398
  SEpSet epSet = {0};
48✔
399
  int8_t epUpdated = 0;
48✔
400
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
48✔
401

402
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
48✔
403
  code =
404
      rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
48✔
405
  if (code != 0) {
48!
UNCOV
406
    dError("failed to send status req since %s", tstrerror(code));
×
UNCOV
407
    return;
×
408
  }
409
  if (rpcRsp.code != 0) {
48!
410
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
411
    return;
×
412
  }
413
  dmProcessConfigRsp(pMgmt, &rpcRsp);
48✔
414
}
415

416
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
7,220✔
417
  SMonVloadInfo vinfo = {0};
7,220✔
418
  dDebug("begin to get vnode loads");
7,220✔
419
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
7,220✔
420
  dDebug("begin to lock status info");
7,220✔
421
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
7,220!
422
    dError("failed to lock status info lock");
×
423
    return;
×
424
  }
425
  if (tsVinfo.pVloads == NULL) {
7,220✔
426
    tsVinfo.pVloads = vinfo.pVloads;
7,096✔
427
    vinfo.pVloads = NULL;
7,096✔
428
  } else {
429
    taosArrayDestroy(vinfo.pVloads);
124✔
430
    vinfo.pVloads = NULL;
124✔
431
  }
432
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
7,220!
433
    dError("failed to unlock status info lock");
×
434
    return;
×
435
  }
436
}
437

438
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
439
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
440
  if (contLen < 0) {
×
441
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
442
    return;
×
443
  }
444
  void *pHead = rpcMallocCont(contLen);
×
445
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
446
  if (contLen < 0) {
×
447
    rpcFreeCont(pHead);
×
448
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
449
    return;
×
450
  }
451

452
  SRpcMsg rpcMsg = {.pCont = pHead,
×
453
                    .contLen = contLen,
454
                    .msgType = TDMT_MND_NOTIFY,
455
                    .info.ahandle = 0,
456
                    .info.notFreeAhandle = 1,
457
                    .info.refId = 0,
458
                    .info.noResp = 1,
459
                    .info.handle = 0};
460

461
  SEpSet epSet = {0};
×
462
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
463
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
464
    dError("failed to send notify req");
×
465
  }
466
}
467

468
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
469
  dError("auth rsp is received, but not supported yet");
×
470
  return 0;
×
471
}
472

473
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
474
  dError("grant rsp is received, but not supported yet");
×
475
  return 0;
×
476
}
477

478
extern void    tsdbAlterNumCompactThreads();
479
static int32_t dmAlterMaxCompactTask(const char *value) {
×
480
  int32_t max_compact_tasks;
481
  char   *endptr = NULL;
×
482

483
  max_compact_tasks = taosStr2Int32(value, &endptr, 10);
×
484
  if (endptr == value || endptr[0] != '\0') {
×
485
    return TSDB_CODE_INVALID_MSG;
×
486
  }
487

488
  if (max_compact_tasks != tsNumOfCompactThreads) {
×
489
    dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks);
×
490
    tsNumOfCompactThreads = max_compact_tasks;
×
491
#ifdef TD_ENTERPRISE
NEW
492
    (void)tsdbAlterNumCompactThreads();
×
493
#endif
494
  }
495

496
  return TSDB_CODE_SUCCESS;
×
497
}
498

499
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
12✔
500
  int32_t       code = 0;
12✔
501
  SDCfgDnodeReq cfgReq = {0};
12✔
502
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
12!
503
    return TSDB_CODE_INVALID_MSG;
×
504
  }
505

506
  if (strncmp(cfgReq.config, tsAlterCompactTaskKeywords, strlen(tsAlterCompactTaskKeywords) + 1) == 0) {
12!
507
    return dmAlterMaxCompactTask(cfgReq.value);
×
508
  }
509

510
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
12!
511

512
  SConfig     *pCfg = taosGetCfg();
12✔
513
  SConfigItem *pItem = NULL;
12✔
514

515
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
12✔
516
  if (code != 0) {
12!
UNCOV
517
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
×
UNCOV
518
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
×
UNCOV
519
      return TSDB_CODE_SUCCESS;
×
520
    } else {
521
      return code;
×
522
    }
523
  }
524
  if (pItem == NULL) {
12!
525
    return TSDB_CODE_CFG_NOT_FOUND;
×
526
  }
527
  if (!isConifgItemLazyMode(pItem)) {
12!
528
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
12!
529
  }
530

531
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
12!
532
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
12✔
533
    if (code != TSDB_CODE_SUCCESS) {
12!
534
      dError("failed to persist global config since %s", tstrerror(code));
×
535
    }
536
  } else {
UNCOV
537
    code = taosPersistLocalConfig(pMgmt->path);
×
UNCOV
538
    if (code != TSDB_CODE_SUCCESS) {
×
539
      dError("failed to persist local config since %s", tstrerror(code));
×
540
    }
541
  }
542
  if (cfgReq.version > 0) {
12!
543
    tsdmConfigVersion = cfgReq.version;
12✔
544
  }
545
  return code;
12✔
546
}
547

548
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
549
#ifdef TD_ENTERPRISE
550
  int32_t       code = 0;
×
551
  SDCfgDnodeReq cfgReq = {0};
×
552
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
×
553
    code = TSDB_CODE_INVALID_MSG;
×
554
    goto _exit;
×
555
  }
556

557
  code = dmUpdateEncryptKey(cfgReq.value, true);
×
558
  if (code == 0) {
×
559
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
×
560
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
×
561
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
×
562
  }
563

564
_exit:
×
565
  pMsg->code = code;
×
566
  pMsg->info.rsp = NULL;
×
567
  pMsg->info.rspLen = 0;
×
568
  return code;
×
569
#else
570
  return 0;
571
#endif
572
}
573

UNCOV
574
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
×
UNCOV
575
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
×
UNCOV
576
  pStatus->details[0] = 0;
×
577

UNCOV
578
  SMonMloadInfo minfo = {0};
×
UNCOV
579
  (*pMgmt->getMnodeLoadsFp)(&minfo);
×
UNCOV
580
  if (minfo.isMnode &&
×
UNCOV
581
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
×
582
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
583
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
584
    return;
×
585
  }
586

UNCOV
587
  SMonVloadInfo vinfo = {0};
×
UNCOV
588
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
×
UNCOV
589
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
×
UNCOV
590
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
×
UNCOV
591
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
×
592
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
593
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
594
               syncStr(pLoad->syncState));
×
595
      break;
×
596
    }
597
  }
598

UNCOV
599
  taosArrayDestroy(vinfo.pVloads);
×
600
}
601

UNCOV
602
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
603
  int32_t code = 0;
×
UNCOV
604
  dDebug("server run status req is received");
×
UNCOV
605
  SServerStatusRsp statusRsp = {0};
×
UNCOV
606
  dmGetServerRunStatus(pMgmt, &statusRsp);
×
607

UNCOV
608
  pMsg->info.rsp = NULL;
×
UNCOV
609
  pMsg->info.rspLen = 0;
×
610

UNCOV
611
  SRpcMsg rspMsg = {.info = pMsg->info};
×
UNCOV
612
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
×
UNCOV
613
  if (rspLen < 0) {
×
614
    return TSDB_CODE_OUT_OF_MEMORY;
×
615
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
616
    // return rspMsg.code;
617
  }
618

UNCOV
619
  void *pRsp = rpcMallocCont(rspLen);
×
UNCOV
620
  if (pRsp == NULL) {
×
621
    return terrno;
×
622
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
623
    // return rspMsg.code;
624
  }
625

UNCOV
626
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
×
UNCOV
627
  if (rspLen < 0) {
×
628
    return TSDB_CODE_INVALID_MSG;
×
629
  }
630

UNCOV
631
  pMsg->info.rsp = pRsp;
×
UNCOV
632
  pMsg->info.rspLen = rspLen;
×
UNCOV
633
  return 0;
×
634
}
635

UNCOV
636
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
×
UNCOV
637
  int32_t code = 0;
×
638

UNCOV
639
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
UNCOV
640
  if (pBlock == NULL) {
×
641
    return terrno;
×
642
  }
643

UNCOV
644
  size_t size = 0;
×
645

UNCOV
646
  const SSysTableMeta *pMeta = NULL;
×
UNCOV
647
  getInfosDbMeta(&pMeta, &size);
×
648

UNCOV
649
  int32_t index = 0;
×
UNCOV
650
  for (int32_t i = 0; i < size; ++i) {
×
UNCOV
651
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
×
UNCOV
652
      index = i;
×
UNCOV
653
      break;
×
654
    }
655
  }
656

UNCOV
657
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
×
UNCOV
658
  if (pBlock->pDataBlock == NULL) {
×
659
    code = terrno;
×
660
    goto _exit;
×
661
  }
662

UNCOV
663
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
×
UNCOV
664
    SColumnInfoData colInfoData = {0};
×
UNCOV
665
    colInfoData.info.colId = i + 1;
×
UNCOV
666
    colInfoData.info.type = pMeta[index].schema[i].type;
×
UNCOV
667
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
×
UNCOV
668
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
×
669
      code = terrno;
×
670
      goto _exit;
×
671
    }
672
  }
673

UNCOV
674
  pBlock->info.hasVarCol = true;
×
UNCOV
675
_exit:
×
UNCOV
676
  if (code != 0) {
×
677
    blockDataDestroy(pBlock);
×
678
  } else {
UNCOV
679
    *ppBlock = pBlock;
×
680
  }
UNCOV
681
  return code;
×
682
}
683

UNCOV
684
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
×
UNCOV
685
  int32_t code = dumpConfToDataBlock(pBlock, 1);
×
UNCOV
686
  if (code != 0) {
×
687
    return code;
×
688
  }
689

UNCOV
690
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
×
UNCOV
691
  if (pColInfo == NULL) {
×
692
    return TSDB_CODE_OUT_OF_RANGE;
×
693
  }
694

UNCOV
695
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false);
×
696
}
697

UNCOV
698
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
699
  int32_t           size = 0;
×
UNCOV
700
  int32_t           rowsRead = 0;
×
UNCOV
701
  int32_t           code = 0;
×
UNCOV
702
  SRetrieveTableReq retrieveReq = {0};
×
UNCOV
703
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
×
704
    return TSDB_CODE_INVALID_MSG;
×
705
  }
706
#if 0
707
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
708
    code = TSDB_CODE_MND_NO_RIGHTS;
709
    return code;
710
  }
711
#endif
UNCOV
712
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
×
713
    return TSDB_CODE_INVALID_MSG;
×
714
  }
715

UNCOV
716
  SSDataBlock *pBlock = NULL;
×
UNCOV
717
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
×
718
    return code;
×
719
  }
720

UNCOV
721
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
×
UNCOV
722
  if (code != 0) {
×
723
    blockDataDestroy(pBlock);
×
724
    return code;
×
725
  }
726

UNCOV
727
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
UNCOV
728
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
×
UNCOV
729
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
×
730

UNCOV
731
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
×
UNCOV
732
  if (pRsp == NULL) {
×
733
    code = terrno;
×
734
    dError("failed to retrieve data since %s", tstrerror(code));
×
735
    blockDataDestroy(pBlock);
×
736
    return code;
×
737
  }
738

UNCOV
739
  char *pStart = pRsp->data;
×
UNCOV
740
  *(int32_t *)pStart = htonl(numOfCols);
×
UNCOV
741
  pStart += sizeof(int32_t);  // number of columns
×
742

UNCOV
743
  for (int32_t i = 0; i < numOfCols; ++i) {
×
UNCOV
744
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
×
UNCOV
745
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
×
746

UNCOV
747
    pSchema->bytes = htonl(pColInfo->info.bytes);
×
UNCOV
748
    pSchema->colId = htons(pColInfo->info.colId);
×
UNCOV
749
    pSchema->type = pColInfo->info.type;
×
750

UNCOV
751
    pStart += sizeof(SSysTableSchema);
×
752
  }
753

UNCOV
754
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
×
UNCOV
755
  if (len < 0) {
×
756
    dError("failed to retrieve data since %s", tstrerror(code));
×
757
    blockDataDestroy(pBlock);
×
758
    rpcFreeCont(pRsp);
×
759
    return terrno;
×
760
  }
761

UNCOV
762
  pRsp->numOfRows = htonl(pBlock->info.rows);
×
UNCOV
763
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
×
UNCOV
764
  pRsp->completed = 1;
×
UNCOV
765
  pMsg->info.rsp = pRsp;
×
UNCOV
766
  pMsg->info.rspLen = size;
×
UNCOV
767
  dDebug("dnode variables retrieve completed");
×
768

UNCOV
769
  blockDataDestroy(pBlock);
×
UNCOV
770
  return TSDB_CODE_SUCCESS;
×
771
}
772

773
SArray *dmGetMsgHandles() {
43✔
774
  int32_t code = -1;
43✔
775
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
43✔
776
  if (pArray == NULL) {
43!
777
    return NULL;
×
778
  }
779

780
  // Requests handled by DNODE
781
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
782
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
783
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
784
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
785
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
786
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
787
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
788
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
789
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
790
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
791
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
792

793
  // Requests handled by MNODE
794
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
795
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
796
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
43!
797

798
  code = 0;
43✔
799

800
_OVER:
43✔
801
  if (code != 0) {
43!
802
    taosArrayDestroy(pArray);
×
803
    return NULL;
×
804
  } else {
805
    return pArray;
43✔
806
  }
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc