• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.52
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20

21
static void *dmStatusThreadFp(void *param) {
2,439✔
22
  SDnodeMgmt *pMgmt = param;
2,439✔
23
  int64_t     lastTime = taosGetTimestampMs();
2,439✔
24
  setThreadName("dnode-status");
2,439✔
25

26
  while (1) {
443,014✔
27
    taosMsleep(200);
445,453✔
28
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
445,453!
29

30
    int64_t curTime = taosGetTimestampMs();
443,014✔
31
    if (curTime < lastTime) lastTime = curTime;
443,014!
32
    float interval = (curTime - lastTime) / 1000.0f;
443,014✔
33
    if (interval >= tsStatusInterval) {
443,014✔
34
      dmSendStatusReq(pMgmt);
88,377✔
35
      lastTime = curTime;
88,377✔
36
    }
37
  }
38

39
  return NULL;
2,439✔
40
}
41

42
static void *dmConfigThreadFp(void *param) {
2,439✔
43
  SDnodeMgmt *pMgmt = param;
2,439✔
44
  int64_t     lastTime = taosGetTimestampMs();
2,439✔
45
  setThreadName("dnode-config");
2,439✔
46
  while (1) {
12,297✔
47
    taosMsleep(200);
14,736✔
48
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
14,736!
49

50
    int64_t curTime = taosGetTimestampMs();
12,297✔
51
    if (curTime < lastTime) lastTime = curTime;
12,297!
52
    float interval = (curTime - lastTime) / 1000.0f;
12,297✔
53
    if (interval >= tsStatusInterval) {
12,297✔
54
      dmSendConfigReq(pMgmt);
2,492✔
55
      lastTime = curTime;
2,492✔
56
    }
57
  }
58
  return NULL;
2,439✔
59
}
60

61
static void *dmStatusInfoThreadFp(void *param) {
2,439✔
62
  SDnodeMgmt *pMgmt = param;
2,439✔
63
  int64_t     lastTime = taosGetTimestampMs();
2,439✔
64
  setThreadName("dnode-status-info");
2,439✔
65

66
  int32_t upTimeCount = 0;
2,439✔
67
  int64_t upTime = 0;
2,439✔
68

69
  while (1) {
462,063✔
70
    taosMsleep(200);
464,502✔
71
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
464,502!
72

73
    int64_t curTime = taosGetTimestampMs();
462,063✔
74
    if (curTime < lastTime) lastTime = curTime;
462,063!
75
    float interval = (curTime - lastTime) / 1000.0f;
462,063✔
76
    if (interval >= tsStatusInterval) {
462,063✔
77
      dmUpdateStatusInfo(pMgmt);
91,554✔
78
      lastTime = curTime;
91,554✔
79

80
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
91,554✔
81
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
744✔
82
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
744✔
83
      }
84
    }
85
  }
86

87
  return NULL;
2,439✔
88
}
89

90
#if defined(TD_ENTERPRISE)
91
SDmNotifyHandle dmNotifyHdl = {.state = 0};
92
#define TIMESERIES_STASH_NUM 5
93
static void *dmNotifyThreadFp(void *param) {
2,439✔
94
  SDnodeMgmt *pMgmt = param;
2,439✔
95
  int64_t     lastTime = taosGetTimestampMs();
2,439✔
96
  setThreadName("dnode-notify");
2,439✔
97

98
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
2,439!
99
    return NULL;
×
100
  }
101

102
  // calculate approximate timeSeries per second
103
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
104
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
105
  int64_t  approximateTimeSeries = 0;
2,439✔
106
  uint64_t nTotalNotify = 0;
2,439✔
107
  int32_t  head, tail = 0;
2,439✔
108

109
  bool       wait = true;
2,439✔
110
  int32_t    nDnode = 0;
2,439✔
111
  int64_t    lastNotify = 0;
2,439✔
112
  int64_t    lastFetchDnode = 0;
2,439✔
113
  SNotifyReq req = {0};
2,439✔
114
  while (1) {
88,204✔
115
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
90,643!
116
    if (wait) tsem_wait(&dmNotifyHdl.sem);
88,204✔
117
    atomic_store_8(&dmNotifyHdl.state, 1);
88,204✔
118

119
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
88,204✔
120
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
88,204!
121
      goto _skip;
88,199✔
122
    }
123
    int64_t current = taosGetTimestampMs();
5✔
124
    if (current - lastFetchDnode > 1000) {
5!
125
      nDnode = dmGetDnodeSize(pMgmt->pData);
5✔
126
      if (nDnode < 1) nDnode = 1;
5!
127
      lastFetchDnode = current;
5✔
128
    }
129
    if (req.dnodeId == 0 || req.clusterId == 0) {
5!
130
      req.dnodeId = pMgmt->pData->dnodeId;
5✔
131
      req.clusterId = pMgmt->pData->clusterId;
5✔
132
    }
133

134
    if (current - lastNotify < 10) {
5!
135
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
136
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
137
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
138
        taosMsleep(10);
×
139
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
140
        taosMsleep(5);
×
141
      } else {
142
        taosMsleep(2);
×
143
      }
144
    }
145

146
    SMonVloadInfo vinfo = {0};
5✔
147
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
5✔
148
    req.pVloads = vinfo.pVloads;
5✔
149
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
5✔
150
    int64_t nTimeSeries = 0;
5✔
151
    for (int32_t i = 0; i < nVgroup; ++i) {
5!
152
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
153
      nTimeSeries += vload->nTimeSeries;
×
154
    }
155
    notifyTimeSeries[tail] = nTimeSeries;
5✔
156
    notifyTimeStamp[tail] = taosGetTimestampNs();
5✔
157
    ++nTotalNotify;
5✔
158

159
    approximateTimeSeries = 0;
5✔
160
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
5!
161
      head = tail - TIMESERIES_STASH_NUM + 1;
×
162
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
163
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
164
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
165
      if (tsDiff > 0) {
×
166
        if (timeDiff > 0 && timeDiff < 1e9) {
×
167
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
168
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
169
            dmSendNotifyReq(pMgmt, &req);
×
170
          }
171
        } else {
172
          dmSendNotifyReq(pMgmt, &req);
×
173
        }
174
      }
175
    } else {
176
      dmSendNotifyReq(pMgmt, &req);
5✔
177
    }
178
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
5!
179

180
    tFreeSNotifyReq(&req);
5✔
181
    lastNotify = taosGetTimestampMs();
5✔
182
  _skip:
88,204✔
183
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
88,204✔
184
      wait = true;
88,169✔
185
      continue;
88,169✔
186
    }
187
    wait = false;
35✔
188
  }
189

190
  return NULL;
2,439✔
191
}
192
#endif
193

194
#ifdef USE_MONITOR
195
static void *dmMonitorThreadFp(void *param) {
2,439✔
196
  SDnodeMgmt *pMgmt = param;
2,439✔
197
  int64_t     lastTime = taosGetTimestampMs();
2,439✔
198
  int64_t     lastTimeForBasic = taosGetTimestampMs();
2,439✔
199
  setThreadName("dnode-monitor");
2,439✔
200

201
  static int32_t TRIM_FREQ = 20;
202
  int32_t        trimCount = 0;
2,439✔
203

204
  while (1) {
462,342✔
205
    taosMsleep(200);
464,781✔
206
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
464,781!
207

208
    int64_t curTime = taosGetTimestampMs();
462,342✔
209

210
    if (curTime < lastTime) lastTime = curTime;
462,342!
211
    float interval = (curTime - lastTime) / 1000.0f;
462,342✔
212
    if (interval >= tsMonitorInterval) {
462,342✔
213
      (*pMgmt->sendMonitorReportFp)();
2,220✔
214
      (*pMgmt->monitorCleanExpiredSamplesFp)();
2,220✔
215
      lastTime = curTime;
2,220✔
216

217
      trimCount = (trimCount + 1) % TRIM_FREQ;
2,220✔
218
      if (trimCount == 0) {
2,220✔
219
        taosMemoryTrim(0, NULL);
1!
220
      }
221
    }
222
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
462,342✔
223
      taosMemoryTrim(0, NULL);
16,910!
224
    }
225
  }
226

227
  return NULL;
2,439✔
228
}
229
#endif
230
#ifdef USE_AUDIT
231
static void *dmAuditThreadFp(void *param) {
2,439✔
232
  SDnodeMgmt *pMgmt = param;
2,439✔
233
  int64_t     lastTime = taosGetTimestampMs();
2,439✔
234
  setThreadName("dnode-audit");
2,439✔
235

236
  while (1) {
925,668✔
237
    taosMsleep(100);
928,107✔
238
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
928,107!
239

240
    int64_t curTime = taosGetTimestampMs();
925,668✔
241
    if (curTime < lastTime) lastTime = curTime;
925,668!
242
    float interval = curTime - lastTime;
925,668✔
243
    if (interval >= tsAuditInterval) {
925,668✔
244
      (*pMgmt->sendAuditRecordsFp)();
17,328✔
245
      lastTime = curTime;
17,328✔
246
    }
247
  }
248

249
  return NULL;
2,439✔
250
}
251
#endif
252
#ifdef USE_REPORT
253
static void *dmCrashReportThreadFp(void *param) {
×
254
  int32_t     code = 0;
×
255
  SDnodeMgmt *pMgmt = param;
×
256
  int64_t     lastTime = taosGetTimestampMs();
×
257
  setThreadName("dnode-crashReport");
×
258
  char filepath[PATH_MAX] = {0};
×
259
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
260
  char     *pMsg = NULL;
×
261
  int64_t   msgLen = 0;
×
262
  TdFilePtr pFile = NULL;
×
263
  bool      truncateFile = false;
×
264
  int32_t   sleepTime = 200;
×
265
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
266
  int32_t   loopTimes = reportPeriodNum;
×
267

268
  STelemAddrMgmt mgt = {0};
×
269
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
270
  if (code != 0) {
×
271
    dError("failed to init telemetry since %s", tstrerror(code));
×
272
    return NULL;
×
273
  }
274
  code = initCrashLogWriter();
×
275
  if (code != 0) {
×
276
    dError("failed to init crash log writer since %s", tstrerror(code));
×
277
    return NULL;
×
278
  }
279

280
  while (1) {
281
    checkAndPrepareCrashInfo();
×
282
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
283
      break;
×
284
    }
285
    if (loopTimes++ < reportPeriodNum) {
×
286
      taosMsleep(sleepTime);
×
NEW
287
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
288
      continue;
×
289
    }
290
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
291
    if (pMsg && msgLen > 0) {
×
292
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
293
        dError("failed to send crash report");
×
294
        if (pFile) {
×
295
          taosReleaseCrashLogFile(pFile, false);
×
296
          pFile = NULL;
×
297

298
          taosMsleep(sleepTime);
×
299
          loopTimes = 0;
×
300
          continue;
×
301
        }
302
      } else {
303
        dInfo("succeed to send crash report");
×
304
        truncateFile = true;
×
305
      }
306
    } else {
307
      dInfo("no crash info was found");
×
308
    }
309

310
    taosMemoryFree(pMsg);
×
311

312
    if (pMsg && msgLen > 0) {
×
313
      pMsg = NULL;
×
314
      continue;
×
315
    }
316

317
    if (pFile) {
×
318
      taosReleaseCrashLogFile(pFile, truncateFile);
×
319
      pFile = NULL;
×
320
      truncateFile = false;
×
321
    }
322

323
    taosMsleep(sleepTime);
×
324
    loopTimes = 0;
×
325
  }
326
  taosTelemetryDestroy(&mgt);
×
327

328
  return NULL;
×
329
}
330
#endif
331

332
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
2,439✔
333
  int32_t      code = 0;
2,439✔
334
  TdThreadAttr thAttr;
335
  (void)taosThreadAttrInit(&thAttr);
2,439✔
336
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,439✔
337
#ifdef TD_COMPACT_OS
338
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
339
#endif
340
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
2,439!
341
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
342
    dError("failed to create status thread since %s", tstrerror(code));
×
343
    return code;
×
344
  }
345

346
  (void)taosThreadAttrDestroy(&thAttr);
2,439✔
347
  tmsgReportStartup("dnode-status", "initialized");
2,439✔
348
  return 0;
2,439✔
349
}
350

351
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
2,439✔
352
  int32_t      code = 0;
2,439✔
353
  TdThreadAttr thAttr;
354
  (void)taosThreadAttrInit(&thAttr);
2,439✔
355
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,439✔
356
#ifdef TD_COMPACT_OS
357
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
358
#endif
359
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
2,439!
360
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
361
    dError("failed to create config thread since %s", tstrerror(code));
×
362
    return code;
×
363
  }
364

365
  (void)taosThreadAttrDestroy(&thAttr);
2,439✔
366
  tmsgReportStartup("config-status", "initialized");
2,439✔
367
  return 0;
2,439✔
368
}
369

370
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
2,439✔
371
  int32_t      code = 0;
2,439✔
372
  TdThreadAttr thAttr;
373
  (void)taosThreadAttrInit(&thAttr);
2,439✔
374
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,439✔
375
#ifdef TD_COMPACT_OS
376
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
377
#endif
378
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
2,439!
379
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
380
    dError("failed to create status Info thread since %s", tstrerror(code));
×
381
    return code;
×
382
  }
383

384
  (void)taosThreadAttrDestroy(&thAttr);
2,439✔
385
  tmsgReportStartup("dnode-status-info", "initialized");
2,439✔
386
  return 0;
2,439✔
387
}
388

389
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
2,439✔
390
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
2,439!
391
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
2,439✔
392
    taosThreadClear(&pMgmt->statusThread);
2,439✔
393
  }
394
}
2,439✔
395

396
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
2,439✔
397
  if (taosCheckPthreadValid(pMgmt->configThread)) {
2,439!
398
    (void)taosThreadJoin(pMgmt->configThread, NULL);
2,439✔
399
    taosThreadClear(&pMgmt->configThread);
2,439✔
400
  }
401
}
2,439✔
402

403
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
2,439✔
404
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
2,439!
405
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
2,439✔
406
    taosThreadClear(&pMgmt->statusInfoThread);
2,439✔
407
  }
408
}
2,439✔
409
#ifdef TD_ENTERPRISE
410
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
2,439✔
411
  int32_t      code = 0;
2,439✔
412
  TdThreadAttr thAttr;
413
  (void)taosThreadAttrInit(&thAttr);
2,439✔
414
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,439✔
415
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
2,439!
416
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
417
    dError("failed to create notify thread since %s", tstrerror(code));
×
418
    return code;
×
419
  }
420

421
  (void)taosThreadAttrDestroy(&thAttr);
2,439✔
422
  tmsgReportStartup("dnode-notify", "initialized");
2,439✔
423
  return 0;
2,439✔
424
}
425

426
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
2,439✔
427
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
2,439!
428
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
2,439!
429
      dError("failed to post notify sem");
×
430
    }
431

432
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
2,439✔
433
    taosThreadClear(&pMgmt->notifyThread);
2,439✔
434
  }
435
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
2,439!
436
    dError("failed to destroy notify sem");
×
437
  }
438
}
2,439✔
439
#endif
440
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
2,439✔
441
  int32_t      code = 0;
2,439✔
442
#ifdef USE_MONITOR
443
  TdThreadAttr thAttr;
444
  (void)taosThreadAttrInit(&thAttr);
2,439✔
445
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,439✔
446
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
2,439!
447
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
448
    dError("failed to create monitor thread since %s", tstrerror(code));
×
449
    return code;
×
450
  }
451

452
  (void)taosThreadAttrDestroy(&thAttr);
2,439✔
453
  tmsgReportStartup("dnode-monitor", "initialized");
2,439✔
454
#endif
455
  return 0;
2,439✔
456
}
457

458
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
2,439✔
459
  int32_t      code = 0;
2,439✔
460
#ifdef USE_AUDIT  
461
  TdThreadAttr thAttr;
462
  (void)taosThreadAttrInit(&thAttr);
2,439✔
463
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,439✔
464
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
2,439!
465
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
466
    dError("failed to create audit thread since %s", tstrerror(code));
×
467
    return code;
×
468
  }
469

470
  (void)taosThreadAttrDestroy(&thAttr);
2,439✔
471
  tmsgReportStartup("dnode-audit", "initialized");
2,439✔
472
#endif  
473
  return 0;
2,439✔
474
}
475

476
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
2,439✔
477
#ifdef USE_MONITOR
478
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
2,439!
479
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
2,439✔
480
    taosThreadClear(&pMgmt->monitorThread);
2,439✔
481
  }
482
#endif
483
}
2,439✔
484

485
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
2,439✔
486
#ifdef USE_AUDIT
487
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
2,439!
488
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
2,439✔
489
    taosThreadClear(&pMgmt->auditThread);
2,439✔
490
  }
491
#endif
492
}
2,439✔
493

494
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
2,439✔
495
  int32_t code = 0;
2,439✔
496
#ifdef USE_REPORT
497
  if (!tsEnableCrashReport) {
2,439!
498
    return 0;
2,439✔
499
  }
500

501
  TdThreadAttr thAttr;
502
  (void)taosThreadAttrInit(&thAttr);
×
503
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
504
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
505
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
506
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
507
    return code;
×
508
  }
509

510
  (void)taosThreadAttrDestroy(&thAttr);
×
511
  tmsgReportStartup("dnode-crashReport", "initialized");
×
512
#endif
513
  return 0;
×
514
}
515

516
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
2,439✔
517
#ifdef USE_REPORT
518
  if (!tsEnableCrashReport) {
2,439!
519
    return;
2,439✔
520
  }
521

522
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
523
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
524
    taosThreadClear(&pMgmt->crashReportThread);
×
525
  }
526
#endif
527
}
528

529
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
160,765✔
530
  SDnodeMgmt *pMgmt = pInfo->ahandle;
160,765✔
531
  int32_t     code = -1;
160,765✔
532
  STraceId   *trace = &pMsg->info.traceId;
160,765✔
533
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
160,765!
534

535
  switch (pMsg->msgType) {
160,765!
536
    case TDMT_DND_CONFIG_DNODE:
518✔
537
      code = dmProcessConfigReq(pMgmt, pMsg);
518✔
538
      break;
518✔
539
    case TDMT_MND_AUTH_RSP:
×
540
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
541
      break;
×
542
    case TDMT_MND_GRANT_RSP:
×
543
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
544
      break;
×
545
    case TDMT_DND_CREATE_MNODE:
98✔
546
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
98✔
547
      break;
98✔
548
    case TDMT_DND_DROP_MNODE:
6✔
549
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
6✔
550
      break;
6✔
551
    case TDMT_DND_CREATE_QNODE:
488✔
552
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
488✔
553
      break;
488✔
554
    case TDMT_DND_DROP_QNODE:
10✔
555
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
10✔
556
      break;
10✔
557
    case TDMT_DND_CREATE_SNODE:
11✔
558
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
11✔
559
      break;
11✔
560
    case TDMT_DND_DROP_SNODE:
4✔
561
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
4✔
562
      break;
4✔
NEW
563
    case TDMT_DND_CREATE_BNODE:
×
NEW
564
      code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg);
×
NEW
565
      break;
×
NEW
566
    case TDMT_DND_DROP_BNODE:
×
NEW
567
      code = (*pMgmt->processDropNodeFp)(BNODE, pMsg);
×
NEW
568
      break;
×
569
    case TDMT_DND_ALTER_MNODE_TYPE:
892✔
570
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
892✔
571
      break;
892✔
572
    case TDMT_DND_SERVER_STATUS:
3✔
573
      code = dmProcessServerRunStatus(pMgmt, pMsg);
3✔
574
      break;
3✔
575
    case TDMT_DND_SYSTABLE_RETRIEVE:
196✔
576
      code = dmProcessRetrieve(pMgmt, pMsg);
196✔
577
      break;
196✔
578
    case TDMT_MND_GRANT:
2,629✔
579
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
2,629✔
580
      break;
2,629✔
581
    case TDMT_MND_GRANT_NOTIFY:
155,909✔
582
      code = dmProcessGrantNotify(NULL, pMsg);
155,909✔
583
      break;
155,909✔
584
    case TDMT_DND_CREATE_ENCRYPT_KEY:
1✔
585
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
1✔
586
      break;
1✔
587
    default:
×
588
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
589
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
590
      break;
×
591
  }
592

593
  if (IsReq(pMsg)) {
160,765!
594
    if (code != 0 && terrno != 0) code = terrno;
160,765✔
595
    SRpcMsg rsp = {
160,765✔
596
        .code = code,
597
        .pCont = pMsg->info.rsp,
160,765✔
598
        .contLen = pMsg->info.rspLen,
160,765✔
599
        .info = pMsg->info,
600
    };
601

602
    code = rpcSendResponse(&rsp);
160,765✔
603
    if (code != 0) {
160,765!
604
      dError("failed to send response since %s", tstrerror(code));
×
605
    }
606
  }
607

608
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
160,765✔
609
  rpcFreeCont(pMsg->pCont);
160,765✔
610
  taosFreeQitem(pMsg);
160,765✔
611
}
160,765✔
612

613
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
2,439✔
614
  int32_t          code = 0;
2,439✔
615
  SSingleWorkerCfg cfg = {
2,439✔
616
      .min = 1,
617
      .max = 1,
618
      .name = "dnode-mgmt",
619
      .fp = (FItem)dmProcessMgmtQueue,
620
      .param = pMgmt,
621
  };
622
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
2,439!
623
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
624
    return code;
×
625
  }
626

627
  dDebug("dnode workers are initialized");
2,439✔
628
  return 0;
2,439✔
629
}
630

631
void dmStopWorker(SDnodeMgmt *pMgmt) {
2,439✔
632
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
2,439✔
633
  dDebug("dnode workers are closed");
2,439✔
634
}
2,439✔
635

636
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
160,765✔
637
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
160,765✔
638
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
160,765✔
639
  return taosWriteQitem(pWorker->queue, pMsg);
160,765✔
640
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc