• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
#include "monitor.h"
20
#include "systable.h"
21
#include "tanalytics.h"
22
#include "tchecksum.h"
23
#include "tutil.h"
24

25
extern SConfig *tsCfg;
26

27
SMonVloadInfo tsVinfo = {0};
28

UNCOV
29
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
×
UNCOV
30
  int32_t code = 0;
×
UNCOV
31
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
×
UNCOV
32
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
×
UNCOV
33
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
×
UNCOV
34
    pMgmt->pData->dnodeId = pCfg->dnodeId;
×
UNCOV
35
    pMgmt->pData->clusterId = pCfg->clusterId;
×
UNCOV
36
    monSetDnodeId(pCfg->dnodeId);
×
UNCOV
37
    auditSetDnodeId(pCfg->dnodeId);
×
UNCOV
38
    code = dmWriteEps(pMgmt->pData);
×
UNCOV
39
    if (code != 0) {
×
40
      dInfo("failed to set local info, dnodeId:%d clusterId:%" PRId64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
×
41
            tstrerror(code));
42
    }
UNCOV
43
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
×
44
  }
UNCOV
45
}
×
46

UNCOV
47
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
×
UNCOV
48
  int32_t code = 0;
×
UNCOV
49
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
×
UNCOV
50
  if (pMgmt->pData->ipWhiteVer == ver) {
×
UNCOV
51
    if (ver == 0) {
×
UNCOV
52
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
×
UNCOV
53
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
×
54
        dError("failed to disable ip white list on dnode");
×
55
      }
56
    }
UNCOV
57
    return;
×
58
  }
UNCOV
59
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
×
60

UNCOV
61
  SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
×
UNCOV
62
  int32_t             contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
×
UNCOV
63
  if (contLen < 0) {
×
64
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
65
    return;
×
66
  }
UNCOV
67
  void *pHead = rpcMallocCont(contLen);
×
UNCOV
68
  contLen = tSerializeRetrieveIpWhite(pHead, contLen, &req);
×
UNCOV
69
  if (contLen < 0) {
×
70
    rpcFreeCont(pHead);
×
71
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
72
    return;
×
73
  }
74

UNCOV
75
  SRpcMsg rpcMsg = {.pCont = pHead,
×
76
                    .contLen = contLen,
77
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITE,
78
                    .info.ahandle = 0,
79
                    .info.notFreeAhandle = 1,
80
                    .info.refId = 0,
81
                    .info.noResp = 0,
82
                    .info.handle = 0};
UNCOV
83
  SEpSet  epset = {0};
×
84

UNCOV
85
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
86

UNCOV
87
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
UNCOV
88
  if (code != 0) {
×
89
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
90
  }
91
}
92

UNCOV
93
static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
×
UNCOV
94
  int32_t code = 0;
×
UNCOV
95
  int64_t oldVer = taosAnalGetVersion();
×
UNCOV
96
  if (oldVer == newVer) return;
×
UNCOV
97
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
98

UNCOV
99
  SRetrieveAnalAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
UNCOV
100
  int32_t              contLen = tSerializeRetrieveAnalAlgoReq(NULL, 0, &req);
×
UNCOV
101
  if (contLen < 0) {
×
102
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
103
    return;
×
104
  }
105

UNCOV
106
  void *pHead = rpcMallocCont(contLen);
×
UNCOV
107
  contLen = tSerializeRetrieveAnalAlgoReq(pHead, contLen, &req);
×
UNCOV
108
  if (contLen < 0) {
×
109
    rpcFreeCont(pHead);
×
110
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
111
    return;
×
112
  }
113

UNCOV
114
  SRpcMsg rpcMsg = {
×
115
      .pCont = pHead,
116
      .contLen = contLen,
117
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
118
      .info.ahandle = 0,
119
      .info.refId = 0,
120
      .info.noResp = 0,
121
      .info.handle = 0,
122
  };
UNCOV
123
  SEpSet epset = {0};
×
124

UNCOV
125
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
126

UNCOV
127
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
UNCOV
128
  if (code != 0) {
×
129
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
130
  }
131
}
132

UNCOV
133
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
×
UNCOV
134
  const STraceId *trace = &pRsp->info.traceId;
×
UNCOV
135
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
×
136

UNCOV
137
  if (pRsp->code != 0) {
×
138
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
139
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
140
             pMgmt->statusSeq);
141
      pMgmt->pData->dropped = 1;
×
142
      if (dmWriteEps(pMgmt->pData) != 0) {
×
143
        dError("failed to write dnode file");
×
144
      }
145
      dInfo("dnode will exit since it is in the dropped state");
×
146
      (void)raise(SIGINT);
×
147
    }
148
  } else {
UNCOV
149
    SStatusRsp statusRsp = {0};
×
UNCOV
150
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
×
UNCOV
151
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
×
UNCOV
152
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
×
UNCOV
153
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
×
154
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
UNCOV
155
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
×
UNCOV
156
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
×
UNCOV
157
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
×
158
      }
UNCOV
159
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
×
UNCOV
160
      dmMayShouldUpdateAnalFunc(pMgmt, statusRsp.analVer);
×
161
    }
UNCOV
162
    tFreeSStatusRsp(&statusRsp);
×
163
  }
UNCOV
164
  rpcFreeCont(pRsp->pCont);
×
UNCOV
165
}
×
166

UNCOV
167
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
×
UNCOV
168
  int32_t    code = 0;
×
UNCOV
169
  SStatusReq req = {0};
×
170

UNCOV
171
  dDebug("send status req to mnode, statusSeq:%d, begin to mgnt lock", pMgmt->statusSeq);
×
UNCOV
172
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
×
UNCOV
173
  req.sver = tsVersion;
×
UNCOV
174
  req.dnodeVer = pMgmt->pData->dnodeVer;
×
UNCOV
175
  req.dnodeId = pMgmt->pData->dnodeId;
×
UNCOV
176
  req.clusterId = pMgmt->pData->clusterId;
×
UNCOV
177
  if (req.clusterId == 0) req.dnodeId = 0;
×
UNCOV
178
  req.rebootTime = pMgmt->pData->rebootTime;
×
UNCOV
179
  req.updateTime = pMgmt->pData->updateTime;
×
UNCOV
180
  req.numOfCores = tsNumOfCores;
×
UNCOV
181
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
×
UNCOV
182
  req.numOfDiskCfg = tsDiskCfgNum;
×
UNCOV
183
  req.memTotal = tsTotalMemoryKB * 1024;
×
UNCOV
184
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
×
UNCOV
185
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
×
UNCOV
186
  tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
×
187

UNCOV
188
  req.clusterCfg.statusInterval = tsStatusInterval;
×
UNCOV
189
  req.clusterCfg.checkTime = 0;
×
UNCOV
190
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
×
UNCOV
191
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
×
UNCOV
192
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
×
UNCOV
193
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
×
UNCOV
194
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
×
UNCOV
195
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
×
UNCOV
196
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
×
UNCOV
197
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
×
UNCOV
198
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
×
UNCOV
199
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
×
UNCOV
200
  char timestr[32] = "1970-01-01 00:00:00.00";
×
UNCOV
201
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, NULL) !=
×
202
      0) {
203
    dError("failed to parse time since %s", tstrerror(code));
×
204
  }
UNCOV
205
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
×
UNCOV
206
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
×
UNCOV
207
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
×
UNCOV
208
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
×
209

UNCOV
210
  dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
×
UNCOV
211
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
×
212
    dError("failed to lock status info lock");
×
UNCOV
213
    return;
×
214
  }
UNCOV
215
  req.pVloads = tsVinfo.pVloads;
×
UNCOV
216
  tsVinfo.pVloads = NULL;
×
UNCOV
217
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
×
218
    dError("failed to unlock status info lock");
×
219
    return;
×
220
  }
221

UNCOV
222
  dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
×
UNCOV
223
  SMonMloadInfo minfo = {0};
×
UNCOV
224
  (*pMgmt->getMnodeLoadsFp)(&minfo);
×
UNCOV
225
  req.mload = minfo.load;
×
226

UNCOV
227
  dDebug("send status req to mnode, statusSeq:%d, begin to get qnode loads", pMgmt->statusSeq);
×
UNCOV
228
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
×
229

UNCOV
230
  pMgmt->statusSeq++;
×
UNCOV
231
  req.statusSeq = pMgmt->statusSeq;
×
UNCOV
232
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
×
UNCOV
233
  req.analVer = taosAnalGetVersion();
×
234

UNCOV
235
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
×
UNCOV
236
  if (contLen < 0) {
×
237
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
238
    return;
×
239
  }
240

UNCOV
241
  void *pHead = rpcMallocCont(contLen);
×
UNCOV
242
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
×
UNCOV
243
  if (contLen < 0) {
×
244
    rpcFreeCont(pHead);
×
245
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
246
    return;
×
247
  }
UNCOV
248
  tFreeSStatusReq(&req);
×
249

UNCOV
250
  SRpcMsg rpcMsg = {.pCont = pHead,
×
251
                    .contLen = contLen,
252
                    .msgType = TDMT_MND_STATUS,
253
                    .info.ahandle = 0,
254
                    .info.notFreeAhandle = 1,
255
                    .info.refId = 0,
256
                    .info.noResp = 0,
257
                    .info.handle = 0};
UNCOV
258
  SRpcMsg rpcRsp = {0};
×
259

UNCOV
260
  dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
×
261

UNCOV
262
  SEpSet epSet = {0};
×
UNCOV
263
  int8_t epUpdated = 0;
×
UNCOV
264
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
265

UNCOV
266
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
×
267
  code =
UNCOV
268
      rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
×
UNCOV
269
  if (code != 0) {
×
UNCOV
270
    dError("failed to send status req since %s", tstrerror(code));
×
UNCOV
271
    return;
×
272
  }
273

UNCOV
274
  if (rpcRsp.code != 0) {
×
275
    dmRotateMnodeEpSet(pMgmt->pData);
×
276
    char tbuf[512];
277
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
×
278
    dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse);
×
279
  } else {
UNCOV
280
    if (epUpdated == 1) {
×
UNCOV
281
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
×
282
    }
283
  }
UNCOV
284
  dmProcessStatusRsp(pMgmt, &rpcRsp);
×
285
}
286

UNCOV
287
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
×
UNCOV
288
  const STraceId *trace = &pRsp->info.traceId;
×
UNCOV
289
  int32_t         code = 0;
×
UNCOV
290
  SConfigRsp      configRsp = {0};
×
UNCOV
291
  bool            needStop = false;
×
292

UNCOV
293
  if (pRsp->code != 0) {
×
294
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
295
      dGInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
×
296
      pMgmt->pData->dropped = 1;
×
297
      if (dmWriteEps(pMgmt->pData) != 0) {
×
298
        dError("failed to write dnode file");
×
299
      }
300
      dInfo("dnode will exit since it is in the dropped state");
×
301
      (void)raise(SIGINT);
×
302
    }
303
  } else {
UNCOV
304
    bool needUpdate = false;
×
UNCOV
305
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
×
UNCOV
306
        tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
×
307
      // Try to use cfg file in current dnode.
UNCOV
308
      if (configRsp.forceReadConfig) {
×
309
        if (configRsp.isConifgVerified) {
×
310
          uInfo("force read config and check config verified");
×
311
          code = taosPersistGlobalConfig(taosGetGlobalCfg(tsCfg), pMgmt->path, configRsp.cver);
×
312
          if (code != TSDB_CODE_SUCCESS) {
×
313
            dError("failed to persist global config since %s", tstrerror(code));
×
314
            goto _exit;
×
315
          }
316
          needUpdate = true;
×
317
        } else {
318
          // log the difference configurations
319
          printConfigNotMatch(configRsp.array);
×
320
          needStop = true;
×
321
          goto _exit;
×
322
        }
323
      }
324
      // Try to use cfg from mnode sdb.
UNCOV
325
      if (!configRsp.isVersionVerified) {
×
UNCOV
326
        uInfo("config version not verified, update config");
×
UNCOV
327
        needUpdate = true;
×
UNCOV
328
        code = taosPersistGlobalConfig(configRsp.array, pMgmt->path, configRsp.cver);
×
UNCOV
329
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
330
          dError("failed to persist global config since %s", tstrerror(code));
×
UNCOV
331
          goto _exit;
×
332
        }
333
      }
334
    }
UNCOV
335
    if (needUpdate) {
×
UNCOV
336
      code = cfgUpdateFromArray(tsCfg, configRsp.array);
×
UNCOV
337
      if (code != TSDB_CODE_SUCCESS) {
×
338
        dError("failed to update config since %s", tstrerror(code));
×
339
        goto _exit;
×
340
      }
UNCOV
341
      code = setAllConfigs(tsCfg);
×
UNCOV
342
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
343
        dError("failed to set all configs since %s", tstrerror(code));
×
UNCOV
344
        goto _exit;
×
345
      }
346
    }
UNCOV
347
    code = taosPersistLocalConfig(pMgmt->path);
×
UNCOV
348
    if (code != TSDB_CODE_SUCCESS) {
×
349
      dError("failed to persist local config since %s", tstrerror(code));
×
350
    }
UNCOV
351
    tsConfigInited = 1;
×
352
  }
UNCOV
353
_exit:
×
UNCOV
354
  tFreeSConfigRsp(&configRsp);
×
UNCOV
355
  rpcFreeCont(pRsp->pCont);
×
UNCOV
356
  if (needStop) {
×
357
    dmStop();
×
358
  }
UNCOV
359
}
×
360

UNCOV
361
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
×
UNCOV
362
  int32_t    code = 0;
×
UNCOV
363
  SConfigReq req = {0};
×
364

UNCOV
365
  req.cver = tsdmConfigVersion;
×
UNCOV
366
  req.forceReadConfig = tsForceReadConfig;
×
UNCOV
367
  req.array = taosGetGlobalCfg(tsCfg);
×
UNCOV
368
  dDebug("send config req to mnode, configVersion:%d", req.cver);
×
369

UNCOV
370
  int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
×
UNCOV
371
  if (contLen < 0) {
×
372
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
UNCOV
373
    return;
×
374
  }
375

UNCOV
376
  void *pHead = rpcMallocCont(contLen);
×
UNCOV
377
  if (pHead == NULL) {
×
378
    dError("failed to malloc cont since %s", tstrerror(contLen));
×
379
    return;
×
380
  }
UNCOV
381
  contLen = tSerializeSConfigReq(pHead, contLen, &req);
×
UNCOV
382
  if (contLen < 0) {
×
383
    rpcFreeCont(pHead);
×
384
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
385
    return;
×
386
  }
387

UNCOV
388
  SRpcMsg rpcMsg = {.pCont = pHead,
×
389
                    .contLen = contLen,
390
                    .msgType = TDMT_MND_CONFIG,
391
                    .info.ahandle = 0,
392
                    .info.notFreeAhandle = 1,
393
                    .info.refId = 0,
394
                    .info.noResp = 0,
395
                    .info.handle = 0};
UNCOV
396
  SRpcMsg rpcRsp = {0};
×
397

UNCOV
398
  SEpSet epSet = {0};
×
UNCOV
399
  int8_t epUpdated = 0;
×
UNCOV
400
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
401

UNCOV
402
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
×
403
  code =
UNCOV
404
      rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
×
UNCOV
405
  if (code != 0) {
×
UNCOV
406
    dError("failed to send status req since %s", tstrerror(code));
×
UNCOV
407
    return;
×
408
  }
UNCOV
409
  if (rpcRsp.code != 0) {
×
410
    dError("failed to send config req since %s", tstrerror(rpcRsp.code));
×
411
    return;
×
412
  }
UNCOV
413
  dmProcessConfigRsp(pMgmt, &rpcRsp);
×
414
}
415

UNCOV
416
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
×
UNCOV
417
  SMonVloadInfo vinfo = {0};
×
UNCOV
418
  dDebug("begin to get vnode loads");
×
UNCOV
419
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
×
UNCOV
420
  dDebug("begin to lock status info");
×
UNCOV
421
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
×
422
    dError("failed to lock status info lock");
×
423
    return;
×
424
  }
UNCOV
425
  if (tsVinfo.pVloads == NULL) {
×
UNCOV
426
    tsVinfo.pVloads = vinfo.pVloads;
×
UNCOV
427
    vinfo.pVloads = NULL;
×
428
  } else {
UNCOV
429
    taosArrayDestroy(vinfo.pVloads);
×
UNCOV
430
    vinfo.pVloads = NULL;
×
431
  }
UNCOV
432
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
×
433
    dError("failed to unlock status info lock");
×
434
    return;
×
435
  }
436
}
437

438
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
439
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
440
  if (contLen < 0) {
×
441
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
442
    return;
×
443
  }
444
  void *pHead = rpcMallocCont(contLen);
×
445
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
446
  if (contLen < 0) {
×
447
    rpcFreeCont(pHead);
×
448
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
449
    return;
×
450
  }
451

452
  SRpcMsg rpcMsg = {.pCont = pHead,
×
453
                    .contLen = contLen,
454
                    .msgType = TDMT_MND_NOTIFY,
455
                    .info.ahandle = 0,
456
                    .info.notFreeAhandle = 1,
457
                    .info.refId = 0,
458
                    .info.noResp = 1,
459
                    .info.handle = 0};
460

461
  SEpSet epSet = {0};
×
462
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
463
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
464
    dError("failed to send notify req");
×
465
  }
466
}
467

468
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
469
  dError("auth rsp is received, but not supported yet");
×
470
  return 0;
×
471
}
472

473
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
474
  dError("grant rsp is received, but not supported yet");
×
475
  return 0;
×
476
}
477

UNCOV
478
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
479
  int32_t       code = 0;
×
UNCOV
480
  SDCfgDnodeReq cfgReq = {0};
×
UNCOV
481
  SConfig      *pCfg = taosGetCfg();
×
UNCOV
482
  SConfigItem  *pItem = NULL;
×
483

UNCOV
484
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
×
485
    return TSDB_CODE_INVALID_MSG;
×
486
  }
UNCOV
487
  if (strcasecmp(cfgReq.config, "dataDir") == 0) {
×
488
    return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
×
489
  }
490

UNCOV
491
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
×
492

UNCOV
493
  code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);
×
UNCOV
494
  if (code != 0) {
×
UNCOV
495
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
×
UNCOV
496
      TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
×
UNCOV
497
      return TSDB_CODE_SUCCESS;
×
498
    } else {
499
      return code;
×
500
    }
501
  }
UNCOV
502
  if (pItem == NULL) {
×
503
    return TSDB_CODE_CFG_NOT_FOUND;
×
504
  }
UNCOV
505
  if (!isConifgItemLazyMode(pItem)) {
×
UNCOV
506
    TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));
×
507
  }
508

UNCOV
509
  if (pItem->category == CFG_CATEGORY_GLOBAL) {
×
UNCOV
510
    code = taosPersistGlobalConfig(taosGetGlobalCfg(pCfg), pMgmt->path, tsdmConfigVersion);
×
UNCOV
511
    if (code != TSDB_CODE_SUCCESS) {
×
512
      dError("failed to persist global config since %s", tstrerror(code));
×
513
    }
514
  } else {
UNCOV
515
    code = taosPersistLocalConfig(pMgmt->path);
×
UNCOV
516
    if (code != TSDB_CODE_SUCCESS) {
×
517
      dError("failed to persist local config since %s", tstrerror(code));
×
518
    }
519
  }
UNCOV
520
  if (cfgReq.version > 0) {
×
UNCOV
521
    tsdmConfigVersion = cfgReq.version;
×
522
  }
UNCOV
523
  return code;
×
524
}
525

526
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
527
#ifdef TD_ENTERPRISE
528
  int32_t       code = 0;
×
529
  SDCfgDnodeReq cfgReq = {0};
×
530
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
×
531
    code = TSDB_CODE_INVALID_MSG;
×
532
    goto _exit;
×
533
  }
534

535
  code = dmUpdateEncryptKey(cfgReq.value, true);
×
536
  if (code == 0) {
×
537
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
×
538
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
×
539
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
×
540
  }
541

542
_exit:
×
543
  pMsg->code = code;
×
544
  pMsg->info.rsp = NULL;
×
545
  pMsg->info.rspLen = 0;
×
546
  return code;
×
547
#else
548
  return 0;
549
#endif
550
}
551

UNCOV
552
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
×
UNCOV
553
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
×
UNCOV
554
  pStatus->details[0] = 0;
×
555

UNCOV
556
  SMonMloadInfo minfo = {0};
×
UNCOV
557
  (*pMgmt->getMnodeLoadsFp)(&minfo);
×
UNCOV
558
  if (minfo.isMnode &&
×
UNCOV
559
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
×
560
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
561
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
562
    return;
×
563
  }
564

UNCOV
565
  SMonVloadInfo vinfo = {0};
×
UNCOV
566
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
×
UNCOV
567
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
×
UNCOV
568
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
×
UNCOV
569
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
×
570
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
571
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
572
               syncStr(pLoad->syncState));
×
573
      break;
×
574
    }
575
  }
576

UNCOV
577
  taosArrayDestroy(vinfo.pVloads);
×
578
}
579

UNCOV
580
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
581
  int32_t code = 0;
×
UNCOV
582
  dDebug("server run status req is received");
×
UNCOV
583
  SServerStatusRsp statusRsp = {0};
×
UNCOV
584
  dmGetServerRunStatus(pMgmt, &statusRsp);
×
585

UNCOV
586
  pMsg->info.rsp = NULL;
×
UNCOV
587
  pMsg->info.rspLen = 0;
×
588

UNCOV
589
  SRpcMsg rspMsg = {.info = pMsg->info};
×
UNCOV
590
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
×
UNCOV
591
  if (rspLen < 0) {
×
592
    return TSDB_CODE_OUT_OF_MEMORY;
×
593
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
594
    // return rspMsg.code;
595
  }
596

UNCOV
597
  void *pRsp = rpcMallocCont(rspLen);
×
UNCOV
598
  if (pRsp == NULL) {
×
599
    return terrno;
×
600
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
601
    // return rspMsg.code;
602
  }
603

UNCOV
604
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
×
UNCOV
605
  if (rspLen < 0) {
×
606
    return TSDB_CODE_INVALID_MSG;
×
607
  }
608

UNCOV
609
  pMsg->info.rsp = pRsp;
×
UNCOV
610
  pMsg->info.rspLen = rspLen;
×
UNCOV
611
  return 0;
×
612
}
613

UNCOV
614
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
×
UNCOV
615
  int32_t code = 0;
×
616

UNCOV
617
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
UNCOV
618
  if (pBlock == NULL) {
×
619
    return terrno;
×
620
  }
621

UNCOV
622
  size_t size = 0;
×
623

UNCOV
624
  const SSysTableMeta *pMeta = NULL;
×
UNCOV
625
  getInfosDbMeta(&pMeta, &size);
×
626

UNCOV
627
  int32_t index = 0;
×
UNCOV
628
  for (int32_t i = 0; i < size; ++i) {
×
UNCOV
629
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
×
UNCOV
630
      index = i;
×
UNCOV
631
      break;
×
632
    }
633
  }
634

UNCOV
635
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
×
UNCOV
636
  if (pBlock->pDataBlock == NULL) {
×
637
    code = terrno;
×
638
    goto _exit;
×
639
  }
640

UNCOV
641
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
×
UNCOV
642
    SColumnInfoData colInfoData = {0};
×
UNCOV
643
    colInfoData.info.colId = i + 1;
×
UNCOV
644
    colInfoData.info.type = pMeta[index].schema[i].type;
×
UNCOV
645
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
×
UNCOV
646
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
×
647
      code = terrno;
×
648
      goto _exit;
×
649
    }
650
  }
651

UNCOV
652
  pBlock->info.hasVarCol = true;
×
UNCOV
653
_exit:
×
UNCOV
654
  if (code != 0) {
×
655
    blockDataDestroy(pBlock);
×
656
  } else {
UNCOV
657
    *ppBlock = pBlock;
×
658
  }
UNCOV
659
  return code;
×
660
}
661

UNCOV
662
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
×
UNCOV
663
  int32_t code = dumpConfToDataBlock(pBlock, 1);
×
UNCOV
664
  if (code != 0) {
×
665
    return code;
×
666
  }
667

UNCOV
668
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
×
UNCOV
669
  if (pColInfo == NULL) {
×
670
    return TSDB_CODE_OUT_OF_RANGE;
×
671
  }
672

UNCOV
673
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false);
×
674
}
675

UNCOV
676
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
677
  int32_t           size = 0;
×
UNCOV
678
  int32_t           rowsRead = 0;
×
UNCOV
679
  int32_t           code = 0;
×
UNCOV
680
  SRetrieveTableReq retrieveReq = {0};
×
UNCOV
681
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
×
682
    return TSDB_CODE_INVALID_MSG;
×
683
  }
UNCOV
684
  dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
×
685
#if 0
686
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
687
    code = TSDB_CODE_MND_NO_RIGHTS;
688
    return code;
689
  }
690
#endif
UNCOV
691
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
×
692
    return TSDB_CODE_INVALID_MSG;
×
693
  }
694

UNCOV
695
  SSDataBlock *pBlock = NULL;
×
UNCOV
696
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
×
697
    return code;
×
698
  }
699

UNCOV
700
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
×
UNCOV
701
  if (code != 0) {
×
702
    blockDataDestroy(pBlock);
×
703
    return code;
×
704
  }
705

UNCOV
706
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
UNCOV
707
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
×
UNCOV
708
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
×
709

UNCOV
710
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
×
UNCOV
711
  if (pRsp == NULL) {
×
712
    code = terrno;
×
713
    dError("failed to retrieve data since %s", tstrerror(code));
×
714
    blockDataDestroy(pBlock);
×
715
    return code;
×
716
  }
717

UNCOV
718
  char *pStart = pRsp->data;
×
UNCOV
719
  *(int32_t *)pStart = htonl(numOfCols);
×
UNCOV
720
  pStart += sizeof(int32_t);  // number of columns
×
721

UNCOV
722
  for (int32_t i = 0; i < numOfCols; ++i) {
×
UNCOV
723
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
×
UNCOV
724
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
×
725

UNCOV
726
    pSchema->bytes = htonl(pColInfo->info.bytes);
×
UNCOV
727
    pSchema->colId = htons(pColInfo->info.colId);
×
UNCOV
728
    pSchema->type = pColInfo->info.type;
×
729

UNCOV
730
    pStart += sizeof(SSysTableSchema);
×
731
  }
732

UNCOV
733
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
×
UNCOV
734
  if (len < 0) {
×
735
    dError("failed to retrieve data since %s", tstrerror(code));
×
736
    blockDataDestroy(pBlock);
×
737
    rpcFreeCont(pRsp);
×
738
    return terrno;
×
739
  }
740

UNCOV
741
  pRsp->numOfRows = htonl(pBlock->info.rows);
×
UNCOV
742
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
×
UNCOV
743
  pRsp->completed = 1;
×
UNCOV
744
  pMsg->info.rsp = pRsp;
×
UNCOV
745
  pMsg->info.rspLen = size;
×
UNCOV
746
  dDebug("dnode variables retrieve completed");
×
747

UNCOV
748
  blockDataDestroy(pBlock);
×
UNCOV
749
  return TSDB_CODE_SUCCESS;
×
750
}
751

UNCOV
752
SArray *dmGetMsgHandles() {
×
UNCOV
753
  int32_t code = -1;
×
UNCOV
754
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
×
UNCOV
755
  if (pArray == NULL) {
×
756
    return NULL;
×
757
  }
758

759
  // Requests handled by DNODE
UNCOV
760
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
761
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
762
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
763
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
764
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
765
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
766
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
767
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
768
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
769
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
770
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
771

772
  // Requests handled by MNODE
UNCOV
773
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
774
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
775
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
776

UNCOV
777
  code = 0;
×
778

UNCOV
779
_OVER:
×
UNCOV
780
  if (code != 0) {
×
781
    taosArrayDestroy(pArray);
×
782
    return NULL;
×
783
  } else {
UNCOV
784
    return pArray;
×
785
  }
786
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc