• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.35
/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "dmInt.h"
19
#include "monitor.h"
20
#include "systable.h"
21
#include "tanalytics.h"
22
#include "tchecksum.h"
23

24
extern SConfig *tsCfg;
25

26
SMonVloadInfo tsVinfo = {0};
27

28
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
3,838✔
29
  int32_t code = 0;
3,838✔
30
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
3,838✔
31
    dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
1,957!
32
    (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
1,957✔
33
    pMgmt->pData->dnodeId = pCfg->dnodeId;
1,957✔
34
    pMgmt->pData->clusterId = pCfg->clusterId;
1,957✔
35
    monSetDnodeId(pCfg->dnodeId);
1,957✔
36
    auditSetDnodeId(pCfg->dnodeId);
1,957✔
37
    code = dmWriteEps(pMgmt->pData);
1,957✔
38
    if (code != 0) {
1,957!
39
      dInfo("failed to set local info, dnodeId:%d clusterId:%" PRId64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
×
40
            tstrerror(code));
41
    }
42
    (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
1,957✔
43
  }
44
}
3,838✔
45

46
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
4,009✔
47
  int32_t code = 0;
4,009✔
48
  dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
4,009✔
49
  if (pMgmt->pData->ipWhiteVer == ver) {
4,009!
50
    if (ver == 0) {
4,009!
51
      dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
4,009✔
52
      if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
4,009!
53
        dError("failed to disable ip white list on dnode");
×
54
      }
55
    }
56
    return;
4,009✔
57
  }
UNCOV
58
  int64_t oldVer = pMgmt->pData->ipWhiteVer;
×
59

UNCOV
60
  SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
×
UNCOV
61
  int32_t             contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
×
UNCOV
62
  if (contLen < 0) {
×
63
    dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
×
64
    return;
×
65
  }
UNCOV
66
  void *pHead = rpcMallocCont(contLen);
×
UNCOV
67
  contLen = tSerializeRetrieveIpWhite(pHead, contLen, &req);
×
UNCOV
68
  if (contLen < 0) {
×
69
    rpcFreeCont(pHead);
×
70
    dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
×
71
    return;
×
72
  }
73

UNCOV
74
  SRpcMsg rpcMsg = {.pCont = pHead,
×
75
                    .contLen = contLen,
76
                    .msgType = TDMT_MND_RETRIEVE_IP_WHITE,
77
                    .info.ahandle = 0,
78
                    .info.notFreeAhandle = 1,
79
                    .info.refId = 0,
80
                    .info.noResp = 0,
81
                    .info.handle = 0};
UNCOV
82
  SEpSet  epset = {0};
×
83

UNCOV
84
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
85

UNCOV
86
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
UNCOV
87
  if (code != 0) {
×
88
    dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
×
89
  }
90
}
91

92
static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
4,009✔
93
  int32_t code = 0;
4,009✔
94
  int64_t oldVer = taosAnalGetVersion();
4,009✔
95
  if (oldVer == newVer) return;
4,009!
96
  dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
×
97

98
  SRetrieveAnalAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
×
99
  int32_t              contLen = tSerializeRetrieveAnalAlgoReq(NULL, 0, &req);
×
100
  if (contLen < 0) {
×
101
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
102
    return;
×
103
  }
104

105
  void *pHead = rpcMallocCont(contLen);
×
106
  contLen = tSerializeRetrieveAnalAlgoReq(pHead, contLen, &req);
×
107
  if (contLen < 0) {
×
108
    rpcFreeCont(pHead);
×
109
    dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
×
110
    return;
×
111
  }
112

113
  SRpcMsg rpcMsg = {
×
114
      .pCont = pHead,
115
      .contLen = contLen,
116
      .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
117
      .info.ahandle = (void *)0x9527,
118
      .info.refId = 0,
119
      .info.noResp = 0,
120
      .info.handle = 0,
121
  };
122
  SEpSet epset = {0};
×
123

124
  (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
×
125

126
  code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
×
127
  if (code != 0) {
×
128
    dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
×
129
  }
130
}
131

132
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
103,758✔
133
  const STraceId *trace = &pRsp->info.traceId;
103,758✔
134
  dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
103,758!
135

136
  if (pRsp->code != 0) {
103,758!
137
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
×
138
      dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
×
139
             pMgmt->statusSeq);
140
      pMgmt->pData->dropped = 1;
×
141
      if (dmWriteEps(pMgmt->pData) != 0) {
×
142
        dError("failed to write dnode file");
×
143
      }
144
      dInfo("dnode will exit since it is in the dropped state");
×
145
      (void)raise(SIGINT);
×
146
    }
147
  } else {
148
    SStatusRsp statusRsp = {0};
103,758✔
149
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
107,767!
150
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
4,009✔
151
      if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
4,009✔
152
        dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
3,838!
153
               statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
154
        pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
3,838✔
155
        dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
3,838✔
156
        dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
3,838✔
157
      }
158
      dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
4,009✔
159
      dmMayShouldUpdateAnalFunc(pMgmt, statusRsp.analVer);
4,009✔
160
    }
161
    tFreeSStatusRsp(&statusRsp);
103,758✔
162
  }
163
  rpcFreeCont(pRsp->pCont);
103,758✔
164
}
103,758✔
165

166
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
104,216✔
167
  int32_t    code = 0;
104,216✔
168
  SStatusReq req = {0};
104,216✔
169

170
  dDebug("send status req to mnode, statusSeq:%d, begin to mgnt lock", pMgmt->statusSeq);
104,216✔
171
  (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
104,216✔
172
  req.sver = tsVersion;
104,216✔
173
  req.dnodeVer = pMgmt->pData->dnodeVer;
104,216✔
174
  req.dnodeId = pMgmt->pData->dnodeId;
104,216✔
175
  req.clusterId = pMgmt->pData->clusterId;
104,216✔
176
  if (req.clusterId == 0) req.dnodeId = 0;
104,216✔
177
  req.rebootTime = pMgmt->pData->rebootTime;
104,216✔
178
  req.updateTime = pMgmt->pData->updateTime;
104,216✔
179
  req.numOfCores = tsNumOfCores;
104,216✔
180
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
104,216✔
181
  req.numOfDiskCfg = tsDiskCfgNum;
104,216✔
182
  req.memTotal = tsTotalMemoryKB * 1024;
104,216✔
183
  req.memAvail = req.memTotal - tsQueueMemoryAllowed - 16 * 1024 * 1024;
104,216✔
184
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
104,216✔
185
  tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
104,216✔
186

187
  req.clusterCfg.statusInterval = tsStatusInterval;
104,216✔
188
  req.clusterCfg.checkTime = 0;
104,216✔
189
  req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
104,216✔
190
  req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
104,216✔
191
  req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
104,216✔
192
  req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
104,216✔
193
  req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
104,216✔
194
  req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
104,216✔
195
  req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
104,216✔
196
  req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
104,216✔
197
  req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
104,216✔
198
  tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
104,216✔
199
  char timestr[32] = "1970-01-01 00:00:00.00";
104,216✔
200
  if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0) != 0) {
104,216!
201
    dError("failed to parse time since %s", tstrerror(code));
×
202
  }
203
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
104,216✔
204
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
104,216✔
205
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
104,216✔
206
  (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
104,216✔
207

208
  dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
104,216✔
209
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
104,216!
210
    dError("failed to lock status info lock");
×
211
    return;
458✔
212
  }
213
  req.pVloads = tsVinfo.pVloads;
104,216✔
214
  tsVinfo.pVloads = NULL;
104,216✔
215
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
104,216!
216
    dError("failed to unlock status info lock");
×
217
    return;
×
218
  }
219

220
  dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
104,216✔
221
  SMonMloadInfo minfo = {0};
104,216✔
222
  (*pMgmt->getMnodeLoadsFp)(&minfo);
104,216✔
223
  req.mload = minfo.load;
104,216✔
224

225
  dDebug("send status req to mnode, statusSeq:%d, begin to get qnode loads", pMgmt->statusSeq);
104,216✔
226
  (*pMgmt->getQnodeLoadsFp)(&req.qload);
104,216✔
227

228
  pMgmt->statusSeq++;
104,216✔
229
  req.statusSeq = pMgmt->statusSeq;
104,216✔
230
  req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
104,216✔
231
  req.analVer = taosAnalGetVersion();
104,216✔
232

233
  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
104,216✔
234
  if (contLen < 0) {
104,216!
235
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
236
    return;
×
237
  }
238

239
  void *pHead = rpcMallocCont(contLen);
104,216✔
240
  contLen = tSerializeSStatusReq(pHead, contLen, &req);
104,216✔
241
  if (contLen < 0) {
104,216!
242
    rpcFreeCont(pHead);
×
243
    dError("failed to serialize status req since %s", tstrerror(contLen));
×
244
    return;
×
245
  }
246
  tFreeSStatusReq(&req);
104,216✔
247

248
  SRpcMsg rpcMsg = {.pCont = pHead,
104,216✔
249
                    .contLen = contLen,
250
                    .msgType = TDMT_MND_STATUS,
251
                    .info.ahandle = 0,
252
                    .info.notFreeAhandle = 1,
253
                    .info.refId = 0,
254
                    .info.noResp = 0,
255
                    .info.handle = 0};
256
  SRpcMsg rpcRsp = {0};
104,216✔
257

258
  dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
104,216✔
259

260
  SEpSet epSet = {0};
104,216✔
261
  int8_t epUpdated = 0;
104,216✔
262
  (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
104,216✔
263

264
  dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
104,216✔
265
  code =
266
      rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
104,216✔
267
  if (code != 0) {
104,216✔
268
    dError("failed to send status req since %s", tstrerror(code));
458!
269
    return;
458✔
270
  }
271

272
  if (rpcRsp.code != 0) {
103,758!
273
    dmRotateMnodeEpSet(pMgmt->pData);
×
274
    char tbuf[512];
275
    dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
×
276
    dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse);
×
277
  } else {
278
    if (epUpdated == 1) {
103,758✔
279
      dmSetMnodeEpSet(pMgmt->pData, &epSet);
305✔
280
    }
281
  }
282
  dmProcessStatusRsp(pMgmt, &rpcRsp);
103,758✔
283
}
284

285
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
106,466✔
286
  SMonVloadInfo vinfo = {0};
106,466✔
287
  dDebug("begin to get vnode loads");
106,466✔
288
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
106,466✔
289
  dDebug("begin to lock status info");
106,466✔
290
  if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
106,466!
291
    dError("failed to lock status info lock");
×
292
    return;
×
293
  }
294
  if (tsVinfo.pVloads == NULL) {
106,466✔
295
    tsVinfo.pVloads = vinfo.pVloads;
102,615✔
296
    vinfo.pVloads = NULL;
102,615✔
297
  } else {
298
    taosArrayDestroy(vinfo.pVloads);
3,851✔
299
    vinfo.pVloads = NULL;
3,851✔
300
  }
301
  if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
106,466!
302
    dError("failed to unlock status info lock");
×
303
    return;
×
304
  }
305
}
306

307
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
×
308
  int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
×
309
  if (contLen < 0) {
×
310
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
311
    return;
×
312
  }
313
  void *pHead = rpcMallocCont(contLen);
×
314
  contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
×
315
  if (contLen < 0) {
×
316
    rpcFreeCont(pHead);
×
317
    dError("failed to serialize notify req since %s", tstrerror(contLen));
×
318
    return;
×
319
  }
320

321
  SRpcMsg rpcMsg = {.pCont = pHead,
×
322
                    .contLen = contLen,
323
                    .msgType = TDMT_MND_NOTIFY,
324
                    .info.ahandle = 0,
325
                    .info.notFreeAhandle = 1,
326
                    .info.refId = 0,
327
                    .info.noResp = 1,
328
                    .info.handle = 0};
329

330
  SEpSet epSet = {0};
×
331
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
×
332
  if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
×
333
    dError("failed to send notify req");
×
334
  }
335
}
336

337
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
338
  dError("auth rsp is received, but not supported yet");
×
339
  return 0;
×
340
}
341

342
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
343
  dError("grant rsp is received, but not supported yet");
×
344
  return 0;
×
345
}
346

347
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,000✔
348
  int32_t       code = 0;
1,000✔
349
  SDCfgDnodeReq cfgReq = {0};
1,000✔
350
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
1,000!
351
    return TSDB_CODE_INVALID_MSG;
×
352
  }
353

354
  dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
1,000!
355

356
  SConfig *pCfg = taosGetCfg();
1,000✔
357

358
  code = cfgSetItem(pCfg, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_CMD, true);
1,000✔
359
  if (code != 0) {
1,000✔
360
    if (strncasecmp(cfgReq.config, "resetlog", strlen("resetlog")) == 0) {
2!
361
      code = 0;
2✔
362
    } else {
363
      return code;
×
364
    }
365
  }
366

367
  return taosCfgDynamicOptions(pCfg, cfgReq.config, true);
1,000✔
368
}
369

370
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1✔
371
#ifdef TD_ENTERPRISE
372
  int32_t       code = 0;
1✔
373
  SDCfgDnodeReq cfgReq = {0};
1✔
374
  if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
1!
375
    code = TSDB_CODE_INVALID_MSG;
×
376
    goto _exit;
×
377
  }
378

379
  code = dmUpdateEncryptKey(cfgReq.value, true);
1✔
380
  if (code == 0) {
1!
381
    tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
1✔
382
    tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
1✔
383
    tstrncpy(tsEncryptKey, cfgReq.value, ENCRYPT_KEY_LEN + 1);
1✔
384
  }
385

386
_exit:
×
387
  pMsg->code = code;
1✔
388
  pMsg->info.rsp = NULL;
1✔
389
  pMsg->info.rspLen = 0;
1✔
390
  return code;
1✔
391
#else
392
  return 0;
393
#endif
394
}
395

396
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
2✔
397
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
2✔
398
  pStatus->details[0] = 0;
2✔
399

400
  SMonMloadInfo minfo = {0};
2✔
401
  (*pMgmt->getMnodeLoadsFp)(&minfo);
2✔
402
  if (minfo.isMnode &&
2!
403
      (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
2!
404
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
405
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
×
406
    return;
×
407
  }
408

409
  SMonVloadInfo vinfo = {0};
2✔
410
  (*pMgmt->getVnodeLoadsFp)(&vinfo);
2✔
411
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
6✔
412
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
4✔
413
    if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
4!
414
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
×
415
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
×
416
               syncStr(pLoad->syncState));
×
417
      break;
×
418
    }
419
  }
420

421
  taosArrayDestroy(vinfo.pVloads);
2✔
422
}
423

424
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2✔
425
  int32_t code = 0;
2✔
426
  dDebug("server run status req is received");
2!
427
  SServerStatusRsp statusRsp = {0};
2✔
428
  dmGetServerRunStatus(pMgmt, &statusRsp);
2✔
429

430
  pMsg->info.rsp = NULL;
2✔
431
  pMsg->info.rspLen = 0;
2✔
432

433
  SRpcMsg rspMsg = {.info = pMsg->info};
2✔
434
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
2✔
435
  if (rspLen < 0) {
2!
436
    return TSDB_CODE_OUT_OF_MEMORY;
×
437
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
438
    // return rspMsg.code;
439
  }
440

441
  void *pRsp = rpcMallocCont(rspLen);
2✔
442
  if (pRsp == NULL) {
2!
443
    return terrno;
×
444
    // rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
445
    // return rspMsg.code;
446
  }
447

448
  rspLen = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
2✔
449
  if (rspLen < 0) {
2!
450
    return TSDB_CODE_INVALID_MSG;
×
451
  }
452

453
  pMsg->info.rsp = pRsp;
2✔
454
  pMsg->info.rspLen = rspLen;
2✔
455
  return 0;
2✔
456
}
457

458
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
3,698✔
459
  int32_t code = 0;
3,698✔
460

461
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
3,698✔
462
  if (pBlock == NULL) {
3,698!
463
    return terrno;
×
464
  }
465

466
  size_t size = 0;
3,698✔
467

468
  const SSysTableMeta *pMeta = NULL;
3,698✔
469
  getInfosDbMeta(&pMeta, &size);
3,698✔
470

471
  int32_t index = 0;
3,698✔
472
  for (int32_t i = 0; i < size; ++i) {
66,564!
473
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
66,564✔
474
      index = i;
3,698✔
475
      break;
3,698✔
476
    }
477
  }
478

479
  pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
3,698✔
480
  if (pBlock->pDataBlock == NULL) {
3,698!
481
    code = terrno;
×
482
    goto _exit;
×
483
  }
484

485
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
22,188✔
486
    SColumnInfoData colInfoData = {0};
18,490✔
487
    colInfoData.info.colId = i + 1;
18,490✔
488
    colInfoData.info.type = pMeta[index].schema[i].type;
18,490✔
489
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
18,490✔
490
    if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
36,980!
491
      code = terrno;
×
492
      goto _exit;
×
493
    }
494
  }
495

496
  pBlock->info.hasVarCol = true;
3,698✔
497
_exit:
3,698✔
498
  if (code != 0) {
3,698!
499
    blockDataDestroy(pBlock);
×
500
  } else {
501
    *ppBlock = pBlock;
3,698✔
502
  }
503
  return code;
3,698✔
504
}
505

506
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
3,698✔
507
  int32_t code = dumpConfToDataBlock(pBlock, 1);
3,698✔
508
  if (code != 0) {
3,698!
509
    return code;
×
510
  }
511

512
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
3,698✔
513
  if (pColInfo == NULL) {
3,698!
514
    return TSDB_CODE_OUT_OF_RANGE;
×
515
  }
516

517
  return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false);
3,698✔
518
}
519

520
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,698✔
521
  int32_t           size = 0;
3,698✔
522
  int32_t           rowsRead = 0;
3,698✔
523
  int32_t           code = 0;
3,698✔
524
  SRetrieveTableReq retrieveReq = {0};
3,698✔
525
  if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
3,698!
526
    return TSDB_CODE_INVALID_MSG;
×
527
  }
528
#if 0
529
  if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
530
    code = TSDB_CODE_MND_NO_RIGHTS;
531
    return code;
532
  }
533
#endif
534
  if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
3,698!
535
    return TSDB_CODE_INVALID_MSG;
×
536
  }
537

538
  SSDataBlock *pBlock = NULL;
3,698✔
539
  if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
3,698!
540
    return code;
×
541
  }
542

543
  code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
3,698✔
544
  if (code != 0) {
3,698!
545
    blockDataDestroy(pBlock);
×
546
    return code;
×
547
  }
548

549
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,698✔
550
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
3,698✔
551
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
3,698✔
552

553
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
3,698✔
554
  if (pRsp == NULL) {
3,698!
555
    code = terrno;
×
556
    dError("failed to retrieve data since %s", tstrerror(code));
×
557
    blockDataDestroy(pBlock);
×
558
    return code;
×
559
  }
560

561
  char *pStart = pRsp->data;
3,698✔
562
  *(int32_t *)pStart = htonl(numOfCols);
3,698✔
563
  pStart += sizeof(int32_t);  // number of columns
3,698✔
564

565
  for (int32_t i = 0; i < numOfCols; ++i) {
22,188✔
566
    SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
18,490✔
567
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
18,490✔
568

569
    pSchema->bytes = htonl(pColInfo->info.bytes);
18,490✔
570
    pSchema->colId = htons(pColInfo->info.colId);
18,490✔
571
    pSchema->type = pColInfo->info.type;
18,490✔
572

573
    pStart += sizeof(SSysTableSchema);
18,490✔
574
  }
575

576
  int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
3,698✔
577
  if (len < 0) {
3,698!
578
    dError("failed to retrieve data since %s", tstrerror(code));
×
579
    blockDataDestroy(pBlock);
×
580
    rpcFreeCont(pRsp);
×
581
    return terrno;
×
582
  }
583

584
  pRsp->numOfRows = htonl(pBlock->info.rows);
3,698✔
585
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
3,698✔
586
  pRsp->completed = 1;
3,698✔
587
  pMsg->info.rsp = pRsp;
3,698✔
588
  pMsg->info.rspLen = size;
3,698✔
589
  dDebug("dnode variables retrieve completed");
3,698!
590

591
  blockDataDestroy(pBlock);
3,698✔
592
  return TSDB_CODE_SUCCESS;
3,698✔
593
}
594

595
SArray *dmGetMsgHandles() {
2,440✔
596
  int32_t code = -1;
2,440✔
597
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
2,440✔
598
  if (pArray == NULL) {
2,440!
599
    return NULL;
×
600
  }
601

602
  // Requests handled by DNODE
603
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
604
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
605
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
606
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
607
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
608
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
609
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
610
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
611
  if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
612
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
613
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
614

615
  // Requests handled by MNODE
616
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
617
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
618
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,440!
619

620
  code = 0;
2,440✔
621

622
_OVER:
2,440✔
623
  if (code != 0) {
2,440!
624
    taosArrayDestroy(pArray);
×
625
    return NULL;
×
626
  } else {
627
    return pArray;
2,440✔
628
  }
629
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc