• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4808

16 Oct 2025 11:40AM UTC coverage: 57.938% (-0.6%) from 58.524%
#4808

push

travis-ci

web-flow
fix(tref): increase TSDB_REF_OBJECTS from 100 to 2000 for improved reference handling (#33281)

137662 of 303532 branches covered (45.35%)

Branch coverage included in aggregate %.

209234 of 295200 relevant lines covered (70.88%)

4035326.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.4
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20
#include "streamMsg.h"
21

22
static void *dmStatusThreadFp(void *param) {
1,808✔
23
  SDnodeMgmt *pMgmt = param;
1,808✔
24
  int64_t     lastTime = taosGetTimestampMs();
1,808✔
25
  setThreadName("dnode-status");
1,808✔
26

27
  while (1) {
1,722,984✔
28
    taosMsleep(50);
1,724,792✔
29
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
1,724,792!
30

31
    int64_t curTime = taosGetTimestampMs();
1,722,984✔
32
    if (curTime < lastTime) lastTime = curTime;
1,722,984!
33
    float interval = curTime - lastTime;
1,722,984✔
34
    if (interval >= tsStatusIntervalMs) {
1,722,984✔
35
      dmSendStatusReq(pMgmt);
85,841✔
36
      lastTime = curTime;
85,841✔
37
    }
38
  }
39

40
  return NULL;
1,808✔
41
}
42

43
static void *dmConfigThreadFp(void *param) {
1,808✔
44
  SDnodeMgmt *pMgmt = param;
1,808✔
45
  int64_t     lastTime = taosGetTimestampMs();
1,808✔
46
  setThreadName("dnode-config");
1,808✔
47
  while (1) {
36,108✔
48
    taosMsleep(50);
37,916✔
49
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
37,916!
50

51
    int64_t curTime = taosGetTimestampMs();
36,108✔
52
    if (curTime < lastTime) lastTime = curTime;
36,108!
53
    float interval = curTime - lastTime;
36,108✔
54
    if (interval >= tsStatusIntervalMs) {
36,108✔
55
      dmSendConfigReq(pMgmt);
1,790✔
56
      lastTime = curTime;
1,790✔
57
    }
58
  }
59
  return NULL;
1,808✔
60
}
61

62
static void *dmStatusInfoThreadFp(void *param) {
1,808✔
63
  SDnodeMgmt *pMgmt = param;
1,808✔
64
  int64_t     lastTime = taosGetTimestampMs();
1,808✔
65
  setThreadName("dnode-status-info");
1,808✔
66

67
  int32_t upTimeCount = 0;
1,808✔
68
  int64_t upTime = 0;
1,808✔
69

70
  while (1) {
1,770,124✔
71
    taosMsleep(50);
1,771,932✔
72
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
1,771,932!
73

74
    int64_t curTime = taosGetTimestampMs();
1,770,124✔
75
    if (curTime < lastTime) lastTime = curTime;
1,770,124!
76
    float interval = curTime - lastTime;
1,770,124✔
77
    if (interval >= tsStatusIntervalMs) {
1,770,124✔
78
      dmUpdateStatusInfo(pMgmt);
87,751✔
79
      lastTime = curTime;
87,751✔
80

81
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
87,751✔
82
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
754✔
83
        if (upTime > 0) tsDndUpTime = upTime;
754!
84
      }
85
    }
86
  }
87

88
  return NULL;
1,808✔
89
}
90

91
#if defined(TD_ENTERPRISE)
92
SDmNotifyHandle dmNotifyHdl = {.state = 0};
93
#define TIMESERIES_STASH_NUM 5
94
static void *dmNotifyThreadFp(void *param) {
1,808✔
95
  SDnodeMgmt *pMgmt = param;
1,808✔
96
  int64_t     lastTime = taosGetTimestampMs();
1,808✔
97
  setThreadName("dnode-notify");
1,808✔
98

99
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
1,808!
100
    return NULL;
×
101
  }
102

103
  // calculate approximate timeSeries per second
104
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
105
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
106
  int64_t  approximateTimeSeries = 0;
1,808✔
107
  uint64_t nTotalNotify = 0;
1,808✔
108
  int32_t  head, tail = 0;
1,808✔
109

110
  bool       wait = true;
1,808✔
111
  int32_t    nDnode = 0;
1,808✔
112
  int64_t    lastNotify = 0;
1,808✔
113
  int64_t    lastFetchDnode = 0;
1,808✔
114
  SNotifyReq req = {0};
1,808✔
115
  while (1) {
104,008✔
116
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
105,816!
117
    if (wait) tsem_wait(&dmNotifyHdl.sem);
104,008✔
118
    atomic_store_8(&dmNotifyHdl.state, 1);
104,008✔
119

120
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
104,008✔
121
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
104,008!
122
      goto _skip;
104,008✔
123
    }
124
    int64_t current = taosGetTimestampMs();
×
125
    if (current - lastFetchDnode > 1000) {
×
126
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
127
      if (nDnode < 1) nDnode = 1;
×
128
      lastFetchDnode = current;
×
129
    }
130
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
131
      req.dnodeId = pMgmt->pData->dnodeId;
×
132
      req.clusterId = pMgmt->pData->clusterId;
×
133
    }
134

135
    if (current - lastNotify < 10) {
×
136
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
137
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
138
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
139
        taosMsleep(10);
×
140
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
141
        taosMsleep(5);
×
142
      } else {
143
        taosMsleep(2);
×
144
      }
145
    }
146

147
    SMonVloadInfo vinfo = {0};
×
148
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
149
    req.pVloads = vinfo.pVloads;
×
150
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
151
    int64_t nTimeSeries = 0;
×
152
    for (int32_t i = 0; i < nVgroup; ++i) {
×
153
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
154
      nTimeSeries += vload->nTimeSeries;
×
155
    }
156
    notifyTimeSeries[tail] = nTimeSeries;
×
157
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
158
    ++nTotalNotify;
×
159

160
    approximateTimeSeries = 0;
×
161
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
162
      head = tail - TIMESERIES_STASH_NUM + 1;
×
163
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
164
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
165
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
166
      if (tsDiff > 0) {
×
167
        if (timeDiff > 0 && timeDiff < 1e9) {
×
168
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
169
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
170
            dmSendNotifyReq(pMgmt, &req);
×
171
          }
172
        } else {
173
          dmSendNotifyReq(pMgmt, &req);
×
174
        }
175
      }
176
    } else {
177
      dmSendNotifyReq(pMgmt, &req);
×
178
    }
179
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
180

181
    tFreeSNotifyReq(&req);
×
182
    lastNotify = taosGetTimestampMs();
×
183
  _skip:
104,008✔
184
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
104,008✔
185
      wait = true;
103,966✔
186
      continue;
103,966✔
187
    }
188
    wait = false;
42✔
189
  }
190

191
  return NULL;
1,808✔
192
}
193
#endif
194

195
#ifdef USE_MONITOR
196
static void *dmMonitorThreadFp(void *param) {
1,808✔
197
  SDnodeMgmt *pMgmt = param;
1,808✔
198
  int64_t     lastTime = taosGetTimestampMs();
1,808✔
199
  int64_t     lastTimeForBasic = taosGetTimestampMs();
1,808✔
200
  setThreadName("dnode-monitor");
1,808✔
201

202
  static int32_t TRIM_FREQ = 20;
203
  int32_t        trimCount = 0;
1,808✔
204

205
  while (1) {
442,855✔
206
    taosMsleep(200);
444,663✔
207
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
444,663!
208

209
    int64_t curTime = taosGetTimestampMs();
442,855✔
210

211
    if (curTime < lastTime) lastTime = curTime;
442,855!
212
    float interval = (curTime - lastTime) / 1000.0f;
442,855✔
213
    if (interval >= tsMonitorInterval) {
442,855✔
214
      (*pMgmt->sendMonitorReportFp)();
2,271✔
215
      (*pMgmt->monitorCleanExpiredSamplesFp)();
2,271✔
216
      lastTime = curTime;
2,271✔
217

218
      trimCount = (trimCount + 1) % TRIM_FREQ;
2,271✔
219
      if (trimCount == 0) {
2,271!
220
        taosMemoryTrim(0, NULL);
×
221
      }
222
    }
223
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
442,855✔
224
      taosMemoryTrim(0, NULL);
16,534!
225
    }
226
  }
227

228
  return NULL;
1,808✔
229
}
230
#endif
231
#ifdef USE_AUDIT
232
static void *dmAuditThreadFp(void *param) {
1,808✔
233
  SDnodeMgmt *pMgmt = param;
1,808✔
234
  int64_t     lastTime = taosGetTimestampMs();
1,808✔
235
  setThreadName("dnode-audit");
1,808✔
236

237
  while (1) {
886,478✔
238
    taosMsleep(100);
888,286✔
239
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
888,286!
240

241
    int64_t curTime = taosGetTimestampMs();
886,478✔
242
    if (curTime < lastTime) lastTime = curTime;
886,478!
243
    float interval = curTime - lastTime;
886,478✔
244
    if (interval >= tsAuditInterval) {
886,478✔
245
      (*pMgmt->sendAuditRecordsFp)();
16,856✔
246
      lastTime = curTime;
16,856✔
247
    }
248
  }
249

250
  return NULL;
1,808✔
251
}
252
#endif
253
#ifdef USE_REPORT
254
static void *dmCrashReportThreadFp(void *param) {
×
255
  int32_t     code = 0;
×
256
  SDnodeMgmt *pMgmt = param;
×
257
  int64_t     lastTime = taosGetTimestampMs();
×
258
  setThreadName("dnode-crashReport");
×
259
  char filepath[PATH_MAX] = {0};
×
260
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
261
  char     *pMsg = NULL;
×
262
  int64_t   msgLen = 0;
×
263
  TdFilePtr pFile = NULL;
×
264
  bool      truncateFile = false;
×
265
  int32_t   sleepTime = 200;
×
266
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
267
  int32_t   loopTimes = reportPeriodNum;
×
268

269
  STelemAddrMgmt mgt = {0};
×
270
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
271
  if (code != 0) {
×
272
    dError("failed to init telemetry since %s", tstrerror(code));
×
273
    return NULL;
×
274
  }
275
  code = initCrashLogWriter();
×
276
  if (code != 0) {
×
277
    dError("failed to init crash log writer since %s", tstrerror(code));
×
278
    return NULL;
×
279
  }
280

281
  while (1) {
282
    checkAndPrepareCrashInfo();
×
283
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
284
      break;
×
285
    }
286
    if (loopTimes++ < reportPeriodNum) {
×
287
      taosMsleep(sleepTime);
×
288
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
289
      continue;
×
290
    }
291
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
292
    if (pMsg && msgLen > 0) {
×
293
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
294
        dError("failed to send crash report");
×
295
        if (pFile) {
×
296
          taosReleaseCrashLogFile(pFile, false);
×
297
          pFile = NULL;
×
298

299
          taosMsleep(sleepTime);
×
300
          loopTimes = 0;
×
301
          continue;
×
302
        }
303
      } else {
304
        dInfo("succeed to send crash report");
×
305
        truncateFile = true;
×
306
      }
307
    } else {
308
      dInfo("no crash info was found");
×
309
    }
310

311
    taosMemoryFree(pMsg);
×
312

313
    if (pMsg && msgLen > 0) {
×
314
      pMsg = NULL;
×
315
      continue;
×
316
    }
317

318
    if (pFile) {
×
319
      taosReleaseCrashLogFile(pFile, truncateFile);
×
320
      pFile = NULL;
×
321
      truncateFile = false;
×
322
    }
323

324
    taosMsleep(sleepTime);
×
325
    loopTimes = 0;
×
326
  }
327
  taosTelemetryDestroy(&mgt);
×
328

329
  return NULL;
×
330
}
331
#endif
332

333
static void *dmMetricsThreadFp(void *param) {
1,808✔
334
  SDnodeMgmt *pMgmt = param;
1,808✔
335
  int64_t     lastTime = taosGetTimestampMs();
1,808✔
336
  setThreadName("dnode-metrics");
1,808✔
337
  while (1) {
443,014✔
338
    taosMsleep(200);
444,822✔
339
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
444,822!
340

341
    int64_t curTime = taosGetTimestampMs();
443,014✔
342
    if (curTime < lastTime) lastTime = curTime;
443,014!
343
    float interval = (curTime - lastTime) / 1000.0f;
443,014✔
344
    if (interval >= tsMetricsInterval) {
443,014✔
345
      (*pMgmt->sendMetricsReportFp)();
2,245✔
346
      (*pMgmt->metricsCleanExpiredSamplesFp)();
2,245✔
347
      lastTime = curTime;
2,245✔
348
    }
349
  }
350
  return NULL;
1,808✔
351
}
352

353
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
1,808✔
354
  int32_t      code = 0;
1,808✔
355
  TdThreadAttr thAttr;
356
  (void)taosThreadAttrInit(&thAttr);
1,808✔
357
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,808✔
358
#ifdef TD_COMPACT_OS
359
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
360
#endif
361
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
1,808!
362
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
363
    dError("failed to create status thread since %s", tstrerror(code));
×
364
    return code;
×
365
  }
366

367
  (void)taosThreadAttrDestroy(&thAttr);
1,808✔
368
  tmsgReportStartup("dnode-status", "initialized");
1,808✔
369
  return 0;
1,808✔
370
}
371

372
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
1,808✔
373
  int32_t      code = 0;
1,808✔
374
  TdThreadAttr thAttr;
375
  (void)taosThreadAttrInit(&thAttr);
1,808✔
376
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,808✔
377
#ifdef TD_COMPACT_OS
378
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
379
#endif
380
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
1,808!
381
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
382
    dError("failed to create config thread since %s", tstrerror(code));
×
383
    return code;
×
384
  }
385

386
  (void)taosThreadAttrDestroy(&thAttr);
1,808✔
387
  tmsgReportStartup("config-status", "initialized");
1,808✔
388
  return 0;
1,808✔
389
}
390

391
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
1,808✔
392
  int32_t      code = 0;
1,808✔
393
  TdThreadAttr thAttr;
394
  (void)taosThreadAttrInit(&thAttr);
1,808✔
395
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,808✔
396
#ifdef TD_COMPACT_OS
397
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
398
#endif
399
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
1,808!
400
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
401
    dError("failed to create status Info thread since %s", tstrerror(code));
×
402
    return code;
×
403
  }
404

405
  (void)taosThreadAttrDestroy(&thAttr);
1,808✔
406
  tmsgReportStartup("dnode-status-info", "initialized");
1,808✔
407
  return 0;
1,808✔
408
}
409

410
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
1,808✔
411
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
1,808!
412
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
1,808✔
413
    taosThreadClear(&pMgmt->statusThread);
1,808✔
414
  }
415
}
1,808✔
416

417
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
1,808✔
418
  if (taosCheckPthreadValid(pMgmt->configThread)) {
1,808!
419
    (void)taosThreadJoin(pMgmt->configThread, NULL);
1,808✔
420
    taosThreadClear(&pMgmt->configThread);
1,808✔
421
  }
422
}
1,808✔
423

424
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
1,808✔
425
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
1,808!
426
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
1,808✔
427
    taosThreadClear(&pMgmt->statusInfoThread);
1,808✔
428
  }
429
}
1,808✔
430
#ifdef TD_ENTERPRISE
431
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
1,808✔
432
  int32_t      code = 0;
1,808✔
433
  TdThreadAttr thAttr;
434
  (void)taosThreadAttrInit(&thAttr);
1,808✔
435
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,808✔
436
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
1,808!
437
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
438
    dError("failed to create notify thread since %s", tstrerror(code));
×
439
    return code;
×
440
  }
441

442
  (void)taosThreadAttrDestroy(&thAttr);
1,808✔
443
  tmsgReportStartup("dnode-notify", "initialized");
1,808✔
444
  return 0;
1,808✔
445
}
446

447
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
1,808✔
448
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
1,808!
449
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
1,808!
450
      dError("failed to post notify sem");
×
451
    }
452

453
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
1,808✔
454
    taosThreadClear(&pMgmt->notifyThread);
1,808✔
455
  }
456
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
1,808!
457
    dError("failed to destroy notify sem");
×
458
  }
459
}
1,808✔
460
#endif
461
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
1,808✔
462
  int32_t      code = 0;
1,808✔
463
#ifdef USE_MONITOR
464
  TdThreadAttr thAttr;
465
  (void)taosThreadAttrInit(&thAttr);
1,808✔
466
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,808✔
467
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
1,808!
468
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
469
    dError("failed to create monitor thread since %s", tstrerror(code));
×
470
    return code;
×
471
  }
472

473
  (void)taosThreadAttrDestroy(&thAttr);
1,808✔
474
  tmsgReportStartup("dnode-monitor", "initialized");
1,808✔
475
#endif
476
  return 0;
1,808✔
477
}
478

479
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
1,808✔
480
  int32_t      code = 0;
1,808✔
481
#ifdef USE_AUDIT  
482
  TdThreadAttr thAttr;
483
  (void)taosThreadAttrInit(&thAttr);
1,808✔
484
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,808✔
485
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
1,808!
486
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
487
    dError("failed to create audit thread since %s", tstrerror(code));
×
488
    return code;
×
489
  }
490

491
  (void)taosThreadAttrDestroy(&thAttr);
1,808✔
492
  tmsgReportStartup("dnode-audit", "initialized");
1,808✔
493
#endif  
494
  return 0;
1,808✔
495
}
496

497
int32_t dmStartMetricsThread(SDnodeMgmt *pMgmt) {
1,808✔
498
  int32_t code = 0;
1,808✔
499
#ifdef USE_MONITOR
500
  TdThreadAttr thAttr;
501
  (void)taosThreadAttrInit(&thAttr);
1,808✔
502
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,808✔
503
  if (taosThreadCreate(&pMgmt->metricsThread, &thAttr, dmMetricsThreadFp, pMgmt) != 0) {
1,808!
504
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
505
    dError("failed to create metrics thread since %s", tstrerror(code));
×
506
    return code;
×
507
  }
508

509
  (void)taosThreadAttrDestroy(&thAttr);
1,808✔
510
  tmsgReportStartup("dnode-metrics", "initialized");
1,808✔
511
#endif
512
  return 0;
1,808✔
513
}
514

515
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
1,808✔
516
#ifdef USE_MONITOR
517
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
1,808!
518
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
1,808✔
519
    taosThreadClear(&pMgmt->monitorThread);
1,808✔
520
  }
521
#endif
522
}
1,808✔
523

524
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
1,808✔
525
#ifdef USE_AUDIT
526
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
1,808!
527
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
1,808✔
528
    taosThreadClear(&pMgmt->auditThread);
1,808✔
529
  }
530
#endif
531
}
1,808✔
532

533
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
1,808✔
534
  int32_t code = 0;
1,808✔
535
#ifdef USE_REPORT
536
  if (!tsEnableCrashReport) {
1,808!
537
    return 0;
1,808✔
538
  }
539

540
  TdThreadAttr thAttr;
541
  (void)taosThreadAttrInit(&thAttr);
×
542
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
543
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
544
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
545
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
546
    return code;
×
547
  }
548

549
  (void)taosThreadAttrDestroy(&thAttr);
×
550
  tmsgReportStartup("dnode-crashReport", "initialized");
×
551
#endif
552
  return 0;
×
553
}
554

555
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
1,808✔
556
#ifdef USE_REPORT
557
  if (!tsEnableCrashReport) {
1,808!
558
    return;
1,808✔
559
  }
560

561
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
562
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
563
    taosThreadClear(&pMgmt->crashReportThread);
×
564
  }
565
#endif
566
}
567

568
void dmStopMetricsThread(SDnodeMgmt *pMgmt) {
1,808✔
569
  if (taosCheckPthreadValid(pMgmt->metricsThread)) {
1,808!
570
    (void)taosThreadJoin(pMgmt->metricsThread, NULL);
1,808✔
571
    taosThreadClear(&pMgmt->metricsThread);
1,808✔
572
  }
573
}
1,808✔
574

575
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
222,633✔
576
  SDnodeMgmt *pMgmt = pInfo->ahandle;
222,633✔
577
  int32_t     code = -1;
222,633✔
578
  STraceId   *trace = &pMsg->info.traceId;
222,633✔
579
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
222,633!
580

581
  switch (pMsg->msgType) {
222,633!
582
    case TDMT_DND_CONFIG_DNODE:
199✔
583
      code = dmProcessConfigReq(pMgmt, pMsg);
199✔
584
      break;
199✔
585
    case TDMT_MND_AUTH_RSP:
×
586
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
587
      break;
×
588
    case TDMT_MND_GRANT_RSP:
×
589
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
590
      break;
×
591
    case TDMT_DND_CREATE_MNODE:
93✔
592
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
93✔
593
      break;
93✔
594
    case TDMT_DND_DROP_MNODE:
5✔
595
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
5✔
596
      break;
5✔
597
    case TDMT_DND_CREATE_QNODE:
43✔
598
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
43✔
599
      break;
43✔
600
    case TDMT_DND_DROP_QNODE:
12✔
601
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
12✔
602
      break;
12✔
603
    case TDMT_DND_CREATE_SNODE:
237✔
604
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
237✔
605
      break;
237✔
606
    case TDMT_DND_ALTER_SNODE:
435✔
607
      code = (*pMgmt->processAlterNodeFp)(SNODE, pMsg);
435✔
608
      break;
435✔
609
    case TDMT_DND_DROP_SNODE:
155✔
610
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
155✔
611
      break;
155✔
612
    case TDMT_DND_CREATE_BNODE:
83✔
613
      code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg);
83✔
614
      break;
83✔
615
    case TDMT_DND_DROP_BNODE:
82✔
616
      code = (*pMgmt->processDropNodeFp)(BNODE, pMsg);
82✔
617
      break;
82✔
618
    case TDMT_DND_ALTER_MNODE_TYPE:
1,006✔
619
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
1,006✔
620
      break;
1,006✔
621
    case TDMT_DND_SERVER_STATUS:
4✔
622
      code = dmProcessServerRunStatus(pMgmt, pMsg);
4✔
623
      break;
4✔
624
    case TDMT_DND_SYSTABLE_RETRIEVE:
270✔
625
      code = dmProcessRetrieve(pMgmt, pMsg);
270✔
626
      break;
270✔
627
    case TDMT_MND_GRANT:
2,844✔
628
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
2,844✔
629
      break;
2,844✔
630
    case TDMT_MND_GRANT_NOTIFY:
217,160✔
631
      code = dmProcessGrantNotify(NULL, pMsg);
217,160✔
632
      break;
217,160✔
633
    case TDMT_DND_CREATE_ENCRYPT_KEY:
5✔
634
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
5✔
635
      break;
5✔
636
    default:
×
637
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
638
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
639
      break;
×
640
  }
641

642
  if (IsReq(pMsg)) {
222,633!
643
    if (code != 0 && terrno != 0) code = terrno;
222,633✔
644
    SRpcMsg rsp = {
222,633✔
645
        .code = code,
646
        .pCont = pMsg->info.rsp,
222,633✔
647
        .contLen = pMsg->info.rspLen,
222,633✔
648
        .info = pMsg->info,
649
    };
650

651
    code = rpcSendResponse(&rsp);
222,633✔
652
    if (code != 0) {
222,633!
653
      dError("failed to send response since %s", tstrerror(code));
×
654
    }
655
  }
656

657
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
222,633✔
658
  rpcFreeCont(pMsg->pCont);
222,633✔
659
  taosFreeQitem(pMsg);
222,633✔
660
}
222,633✔
661

662
int32_t dmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
30,392✔
663
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
30,392✔
664
  if (pMsg->code) {
30,392✔
665
    *pWorkerIdx = 0;
478✔
666
    return TSDB_CODE_SUCCESS;
478✔
667
  }
668
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
29,914✔
669
  *pWorkerIdx = pHeader->streamGid % tsNumOfStreamMgmtThreads;
29,914✔
670
  return TSDB_CODE_SUCCESS;
29,914✔
671
}
672

673

674
static void dmProcessStreamMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
30,392✔
675
  SDnodeMgmt *pMgmt = pInfo->ahandle;
30,392✔
676
  int32_t     code = -1;
30,392✔
677
  STraceId   *trace = &pMsg->info.traceId;
30,392✔
678
  dGTrace("msg:%p, will be processed in dnode stream mgmt queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
30,392!
679

680
  switch (pMsg->msgType) {
30,392!
681
    case TDMT_MND_STREAM_HEARTBEAT_RSP:
30,392✔
682
      code = dmProcessStreamHbRsp(pMgmt, pMsg);
30,392✔
683
      break;
30,392✔
684
    default:
×
685
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
686
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
687
      break;
×
688
  }
689

690
  if (IsReq(pMsg)) {
30,392!
691
    if (code != 0 && terrno != 0) code = terrno;
×
692
    SRpcMsg rsp = {
×
693
        .code = code,
694
        .pCont = pMsg->info.rsp,
×
695
        .contLen = pMsg->info.rspLen,
×
696
        .info = pMsg->info,
697
    };
698

699
    code = rpcSendResponse(&rsp);
×
700
    if (code != 0) {
×
701
      dError("failed to send response since %s", tstrerror(code));
×
702
    }
703
  }
704

705
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
30,392✔
706
  rpcFreeCont(pMsg->pCont);
30,392✔
707
  taosFreeQitem(pMsg);
30,392✔
708
}
30,392✔
709

710

711
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
1,808✔
712
  int32_t          code = 0;
1,808✔
713
  SSingleWorkerCfg cfg = {
1,808✔
714
      .min = 1,
715
      .max = 1,
716
      .name = "dnode-mgmt",
717
      .fp = (FItem)dmProcessMgmtQueue,
718
      .param = pMgmt,
719
  };
720
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
1,808!
721
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
722
    return code;
×
723
  }
724

725
  SDispatchWorkerPool* pStMgmtpool = &pMgmt->streamMgmtWorker;
1,808✔
726
  pStMgmtpool->max = tsNumOfStreamMgmtThreads;
1,808✔
727
  pStMgmtpool->name = "dnode-stream-mgmt";
1,808✔
728
  code = tDispatchWorkerInit(pStMgmtpool);
1,808✔
729
  if (code != 0) {
1,808!
730
    dError("failed to start dnode-stream-mgmt worker since %s", tstrerror(code));
×
731
    return code;
×
732
  }
733
  code = tDispatchWorkerAllocQueue(pStMgmtpool, pMgmt, (FItem)dmProcessStreamMgmtQueue, dmDispatchStreamHbMsg);
1,808✔
734
  if (code != 0) {
1,808!
735
    dError("failed to allocate dnode-stream-mgmt worker queue since %s", tstrerror(code));
×
736
    return code;
×
737
  }
738

739
  dDebug("dnode workers are initialized");
1,808✔
740
  return 0;
1,808✔
741
}
742

743
void dmStopWorker(SDnodeMgmt *pMgmt) {
1,808✔
744
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
1,808✔
745
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorker);
1,808✔
746
  dDebug("dnode workers are closed");
1,808✔
747
}
1,808✔
748

749
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
222,633✔
750
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
222,633✔
751
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
222,633✔
752
  return taosWriteQitem(pWorker->queue, pMsg);
222,633✔
753
}
754

755
int32_t dmPutMsgToStreamMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
30,392✔
756
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorker, pMsg);
30,392✔
757
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc