• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4348

21 Jun 2025 07:48AM UTC coverage: 62.366% (+1.8%) from 60.571%
#4348

push

travis-ci

web-flow
docs: add OpenMetrics support and configuration details to taosAdapter documentation (#31427)

* docs: add OpenMetrics support and configuration details to taosAdapter documentation

* docs: enhance OpenMetrics section in taosAdapter documentation

156282 of 319947 branches covered (48.85%)

Branch coverage included in aggregate %.

242147 of 318911 relevant lines covered (75.93%)

6151642.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.24
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20

21
static void *dmStatusThreadFp(void *param) {
2,660✔
22
  SDnodeMgmt *pMgmt = param;
2,660✔
23
  int64_t     lastTime = taosGetTimestampMs();
2,660✔
24
  setThreadName("dnode-status");
2,660✔
25

26
  while (1) {
484,403✔
27
    taosMsleep(200);
487,063✔
28
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
487,062!
29

30
    int64_t curTime = taosGetTimestampMs();
484,403✔
31
    if (curTime < lastTime) lastTime = curTime;
484,403!
32
    float interval = (curTime - lastTime) / 1000.0f;
484,403✔
33
    if (interval >= tsStatusInterval) {
484,403✔
34
      dmSendStatusReq(pMgmt);
96,606✔
35
      lastTime = curTime;
96,606✔
36
    }
37
  }
38

39
  return NULL;
2,659✔
40
}
41

42
static void *dmConfigThreadFp(void *param) {
2,660✔
43
  SDnodeMgmt *pMgmt = param;
2,660✔
44
  int64_t     lastTime = taosGetTimestampMs();
2,660✔
45
  setThreadName("dnode-config");
2,660✔
46
  while (1) {
13,419✔
47
    taosMsleep(200);
16,079✔
48
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
16,079!
49

50
    int64_t curTime = taosGetTimestampMs();
13,419✔
51
    if (curTime < lastTime) lastTime = curTime;
13,419!
52
    float interval = (curTime - lastTime) / 1000.0f;
13,419✔
53
    if (interval >= tsStatusInterval) {
13,419✔
54
      dmSendConfigReq(pMgmt);
2,733✔
55
      lastTime = curTime;
2,733✔
56
    }
57
  }
58
  return NULL;
2,660✔
59
}
60

61
static void *dmStatusInfoThreadFp(void *param) {
2,660✔
62
  SDnodeMgmt *pMgmt = param;
2,660✔
63
  int64_t     lastTime = taosGetTimestampMs();
2,660✔
64
  setThreadName("dnode-status-info");
2,660✔
65

66
  int32_t upTimeCount = 0;
2,660✔
67
  int64_t upTime = 0;
2,660✔
68

69
  while (1) {
504,446✔
70
    taosMsleep(200);
507,106✔
71
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
507,105!
72

73
    int64_t curTime = taosGetTimestampMs();
504,446✔
74
    if (curTime < lastTime) lastTime = curTime;
504,446!
75
    float interval = (curTime - lastTime) / 1000.0f;
504,446✔
76
    if (interval >= tsStatusInterval) {
504,446✔
77
      dmUpdateStatusInfo(pMgmt);
99,933✔
78
      lastTime = curTime;
99,933✔
79

80
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
99,933✔
81
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
834✔
82
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
834✔
83
      }
84
    }
85
  }
86

87
  return NULL;
2,659✔
88
}
89

90
#if defined(TD_ENTERPRISE)
91
SDmNotifyHandle dmNotifyHdl = {.state = 0};
92
#define TIMESERIES_STASH_NUM 5
93
static void *dmNotifyThreadFp(void *param) {
2,660✔
94
  SDnodeMgmt *pMgmt = param;
2,660✔
95
  int64_t     lastTime = taosGetTimestampMs();
2,660✔
96
  setThreadName("dnode-notify");
2,660✔
97

98
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
2,660!
99
    return NULL;
×
100
  }
101

102
  // calculate approximate timeSeries per second
103
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
104
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
105
  int64_t  approximateTimeSeries = 0;
2,660✔
106
  uint64_t nTotalNotify = 0;
2,660✔
107
  int32_t  head, tail = 0;
2,660✔
108

109
  bool       wait = true;
2,660✔
110
  int32_t    nDnode = 0;
2,660✔
111
  int64_t    lastNotify = 0;
2,660✔
112
  int64_t    lastFetchDnode = 0;
2,660✔
113
  SNotifyReq req = {0};
2,660✔
114
  while (1) {
90,990✔
115
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
93,650!
116
    if (wait) tsem_wait(&dmNotifyHdl.sem);
90,991✔
117
    atomic_store_8(&dmNotifyHdl.state, 1);
90,990✔
118

119
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
90,990✔
120
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
90,990!
121
      goto _skip;
90,985✔
122
    }
123
    int64_t current = taosGetTimestampMs();
5✔
124
    if (current - lastFetchDnode > 1000) {
5!
125
      nDnode = dmGetDnodeSize(pMgmt->pData);
5✔
126
      if (nDnode < 1) nDnode = 1;
5!
127
      lastFetchDnode = current;
5✔
128
    }
129
    if (req.dnodeId == 0 || req.clusterId == 0) {
5!
130
      req.dnodeId = pMgmt->pData->dnodeId;
5✔
131
      req.clusterId = pMgmt->pData->clusterId;
5✔
132
    }
133

134
    if (current - lastNotify < 10) {
5!
135
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
136
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
137
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
138
        taosMsleep(10);
×
139
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
140
        taosMsleep(5);
×
141
      } else {
142
        taosMsleep(2);
×
143
      }
144
    }
145

146
    SMonVloadInfo vinfo = {0};
5✔
147
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
5✔
148
    req.pVloads = vinfo.pVloads;
5✔
149
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
5✔
150
    int64_t nTimeSeries = 0;
5✔
151
    for (int32_t i = 0; i < nVgroup; ++i) {
5!
152
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
153
      nTimeSeries += vload->nTimeSeries;
×
154
    }
155
    notifyTimeSeries[tail] = nTimeSeries;
5✔
156
    notifyTimeStamp[tail] = taosGetTimestampNs();
5✔
157
    ++nTotalNotify;
5✔
158

159
    approximateTimeSeries = 0;
5✔
160
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
5!
161
      head = tail - TIMESERIES_STASH_NUM + 1;
×
162
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
163
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
164
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
165
      if (tsDiff > 0) {
×
166
        if (timeDiff > 0 && timeDiff < 1e9) {
×
167
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
168
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
169
            dmSendNotifyReq(pMgmt, &req);
×
170
          }
171
        } else {
172
          dmSendNotifyReq(pMgmt, &req);
×
173
        }
174
      }
175
    } else {
176
      dmSendNotifyReq(pMgmt, &req);
5✔
177
    }
178
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
5!
179

180
    tFreeSNotifyReq(&req);
5✔
181
    lastNotify = taosGetTimestampMs();
5✔
182
  _skip:
90,990✔
183
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
90,990✔
184
      wait = true;
90,959✔
185
      continue;
90,959✔
186
    }
187
    wait = false;
31✔
188
  }
189

190
  return NULL;
2,659✔
191
}
192
#endif
193

194
#ifdef USE_MONITOR
195
static void *dmMonitorThreadFp(void *param) {
2,660✔
196
  SDnodeMgmt *pMgmt = param;
2,660✔
197
  int64_t     lastTime = taosGetTimestampMs();
2,660✔
198
  int64_t     lastTimeForBasic = taosGetTimestampMs();
2,660✔
199
  setThreadName("dnode-monitor");
2,660✔
200

201
  static int32_t TRIM_FREQ = 20;
202
  int32_t        trimCount = 0;
2,660✔
203

204
  while (1) {
504,720✔
205
    taosMsleep(200);
507,380✔
206
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
507,379!
207

208
    int64_t curTime = taosGetTimestampMs();
504,720✔
209

210
    if (curTime < lastTime) lastTime = curTime;
504,720!
211
    float interval = (curTime - lastTime) / 1000.0f;
504,720✔
212
    if (interval >= tsMonitorInterval) {
504,720✔
213
      (*pMgmt->sendMonitorReportFp)();
2,417✔
214
      (*pMgmt->monitorCleanExpiredSamplesFp)();
2,417✔
215
      lastTime = curTime;
2,417✔
216

217
      trimCount = (trimCount + 1) % TRIM_FREQ;
2,417✔
218
      if (trimCount == 0) {
2,417✔
219
        taosMemoryTrim(0, NULL);
1!
220
      }
221
    }
222
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
504,720✔
223
      taosMemoryTrim(0, NULL);
18,463!
224
    }
225
  }
226

227
  return NULL;
2,659✔
228
}
229
#endif
230
#ifdef USE_AUDIT
231
static void *dmAuditThreadFp(void *param) {
2,660✔
232
  SDnodeMgmt *pMgmt = param;
2,660✔
233
  int64_t     lastTime = taosGetTimestampMs();
2,660✔
234
  setThreadName("dnode-audit");
2,660✔
235

236
  while (1) {
1,010,517✔
237
    taosMsleep(100);
1,013,177✔
238
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
1,013,176!
239

240
    int64_t curTime = taosGetTimestampMs();
1,010,517✔
241
    if (curTime < lastTime) lastTime = curTime;
1,010,517!
242
    float interval = curTime - lastTime;
1,010,517✔
243
    if (interval >= tsAuditInterval) {
1,010,517✔
244
      (*pMgmt->sendAuditRecordsFp)();
18,916✔
245
      lastTime = curTime;
18,916✔
246
    }
247
  }
248

249
  return NULL;
2,659✔
250
}
251
#endif
252
#ifdef USE_REPORT
253
static void *dmCrashReportThreadFp(void *param) {
×
254
  int32_t     code = 0;
×
255
  SDnodeMgmt *pMgmt = param;
×
256
  int64_t     lastTime = taosGetTimestampMs();
×
257
  setThreadName("dnode-crashReport");
×
258
  char filepath[PATH_MAX] = {0};
×
259
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
260
  char     *pMsg = NULL;
×
261
  int64_t   msgLen = 0;
×
262
  TdFilePtr pFile = NULL;
×
263
  bool      truncateFile = false;
×
264
  int32_t   sleepTime = 200;
×
265
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
266
  int32_t   loopTimes = reportPeriodNum;
×
267

268
  STelemAddrMgmt mgt = {0};
×
269
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
270
  if (code != 0) {
×
271
    dError("failed to init telemetry since %s", tstrerror(code));
×
272
    return NULL;
×
273
  }
274
  code = initCrashLogWriter();
×
275
  if (code != 0) {
×
276
    dError("failed to init crash log writer since %s", tstrerror(code));
×
277
    return NULL;
×
278
  }
279

280
  while (1) {
281
    checkAndPrepareCrashInfo();
×
282
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
283
      break;
×
284
    }
285
    if (loopTimes++ < reportPeriodNum) {
×
286
      taosMsleep(sleepTime);
×
287
      if(loopTimes < 0) loopTimes = reportPeriodNum;
×
288
      continue;
×
289
    }
290
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
291
    if (pMsg && msgLen > 0) {
×
292
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
293
        dError("failed to send crash report");
×
294
        if (pFile) {
×
295
          taosReleaseCrashLogFile(pFile, false);
×
296
          pFile = NULL;
×
297

298
          taosMsleep(sleepTime);
×
299
          loopTimes = 0;
×
300
          continue;
×
301
        }
302
      } else {
303
        dInfo("succeed to send crash report");
×
304
        truncateFile = true;
×
305
      }
306
    } else {
307
      dInfo("no crash info was found");
×
308
    }
309

310
    taosMemoryFree(pMsg);
×
311

312
    if (pMsg && msgLen > 0) {
×
313
      pMsg = NULL;
×
314
      continue;
×
315
    }
316

317
    if (pFile) {
×
318
      taosReleaseCrashLogFile(pFile, truncateFile);
×
319
      pFile = NULL;
×
320
      truncateFile = false;
×
321
    }
322

323
    taosMsleep(sleepTime);
×
324
    loopTimes = 0;
×
325
  }
326
  taosTelemetryDestroy(&mgt);
×
327

328
  return NULL;
×
329
}
330
#endif
331

332
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
2,660✔
333
  int32_t      code = 0;
2,660✔
334
  TdThreadAttr thAttr;
335
  (void)taosThreadAttrInit(&thAttr);
2,660✔
336
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,660✔
337
#ifdef TD_COMPACT_OS
338
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
339
#endif
340
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
2,660!
341
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
342
    dError("failed to create status thread since %s", tstrerror(code));
×
343
    return code;
×
344
  }
345

346
  (void)taosThreadAttrDestroy(&thAttr);
2,660✔
347
  tmsgReportStartup("dnode-status", "initialized");
2,660✔
348
  return 0;
2,660✔
349
}
350

351
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
2,660✔
352
  int32_t      code = 0;
2,660✔
353
  TdThreadAttr thAttr;
354
  (void)taosThreadAttrInit(&thAttr);
2,660✔
355
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,660✔
356
#ifdef TD_COMPACT_OS
357
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
358
#endif
359
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
2,660!
360
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
361
    dError("failed to create config thread since %s", tstrerror(code));
×
362
    return code;
×
363
  }
364

365
  (void)taosThreadAttrDestroy(&thAttr);
2,660✔
366
  tmsgReportStartup("config-status", "initialized");
2,660✔
367
  return 0;
2,660✔
368
}
369

370
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
2,660✔
371
  int32_t      code = 0;
2,660✔
372
  TdThreadAttr thAttr;
373
  (void)taosThreadAttrInit(&thAttr);
2,660✔
374
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,660✔
375
#ifdef TD_COMPACT_OS
376
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
377
#endif
378
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
2,660!
379
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
380
    dError("failed to create status Info thread since %s", tstrerror(code));
×
381
    return code;
×
382
  }
383

384
  (void)taosThreadAttrDestroy(&thAttr);
2,660✔
385
  tmsgReportStartup("dnode-status-info", "initialized");
2,660✔
386
  return 0;
2,660✔
387
}
388

389
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
2,659✔
390
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
2,659!
391
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
2,659✔
392
    taosThreadClear(&pMgmt->statusThread);
2,659✔
393
  }
394
}
2,659✔
395

396
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
2,659✔
397
  if (taosCheckPthreadValid(pMgmt->configThread)) {
2,659!
398
    (void)taosThreadJoin(pMgmt->configThread, NULL);
2,659✔
399
    taosThreadClear(&pMgmt->configThread);
2,659✔
400
  }
401
}
2,659✔
402

403
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
2,659✔
404
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
2,659!
405
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
2,659✔
406
    taosThreadClear(&pMgmt->statusInfoThread);
2,659✔
407
  }
408
}
2,659✔
409
#ifdef TD_ENTERPRISE
410
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
2,660✔
411
  int32_t      code = 0;
2,660✔
412
  TdThreadAttr thAttr;
413
  (void)taosThreadAttrInit(&thAttr);
2,660✔
414
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,660✔
415
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
2,660!
416
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
417
    dError("failed to create notify thread since %s", tstrerror(code));
×
418
    return code;
×
419
  }
420

421
  (void)taosThreadAttrDestroy(&thAttr);
2,660✔
422
  tmsgReportStartup("dnode-notify", "initialized");
2,660✔
423
  return 0;
2,660✔
424
}
425

426
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
2,659✔
427
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
2,659!
428
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
2,659!
429
      dError("failed to post notify sem");
×
430
    }
431

432
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
2,659✔
433
    taosThreadClear(&pMgmt->notifyThread);
2,659✔
434
  }
435
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
2,659!
436
    dError("failed to destroy notify sem");
×
437
  }
438
}
2,659✔
439
#endif
440
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
2,660✔
441
  int32_t      code = 0;
2,660✔
442
#ifdef USE_MONITOR
443
  TdThreadAttr thAttr;
444
  (void)taosThreadAttrInit(&thAttr);
2,660✔
445
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,660✔
446
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
2,660!
447
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
448
    dError("failed to create monitor thread since %s", tstrerror(code));
×
449
    return code;
×
450
  }
451

452
  (void)taosThreadAttrDestroy(&thAttr);
2,660✔
453
  tmsgReportStartup("dnode-monitor", "initialized");
2,660✔
454
#endif
455
  return 0;
2,660✔
456
}
457

458
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
2,660✔
459
  int32_t      code = 0;
2,660✔
460
#ifdef USE_AUDIT  
461
  TdThreadAttr thAttr;
462
  (void)taosThreadAttrInit(&thAttr);
2,660✔
463
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,660✔
464
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
2,660!
465
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
466
    dError("failed to create audit thread since %s", tstrerror(code));
×
467
    return code;
×
468
  }
469

470
  (void)taosThreadAttrDestroy(&thAttr);
2,660✔
471
  tmsgReportStartup("dnode-audit", "initialized");
2,660✔
472
#endif  
473
  return 0;
2,660✔
474
}
475

476
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
2,659✔
477
#ifdef USE_MONITOR
478
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
2,659!
479
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
2,659✔
480
    taosThreadClear(&pMgmt->monitorThread);
2,659✔
481
  }
482
#endif
483
}
2,659✔
484

485
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
2,659✔
486
#ifdef USE_AUDIT
487
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
2,659!
488
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
2,659✔
489
    taosThreadClear(&pMgmt->auditThread);
2,659✔
490
  }
491
#endif
492
}
2,659✔
493

494
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
2,660✔
495
  int32_t code = 0;
2,660✔
496
#ifdef USE_REPORT
497
  if (!tsEnableCrashReport) {
2,660!
498
    return 0;
2,660✔
499
  }
500

501
  TdThreadAttr thAttr;
502
  (void)taosThreadAttrInit(&thAttr);
×
503
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
504
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
505
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
506
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
507
    return code;
×
508
  }
509

510
  (void)taosThreadAttrDestroy(&thAttr);
×
511
  tmsgReportStartup("dnode-crashReport", "initialized");
×
512
#endif
513
  return 0;
×
514
}
515

516
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
2,659✔
517
#ifdef USE_REPORT
518
  if (!tsEnableCrashReport) {
2,659!
519
    return;
2,659✔
520
  }
521

522
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
523
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
524
    taosThreadClear(&pMgmt->crashReportThread);
×
525
  }
526
#endif
527
}
528

529
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
194,228✔
530
  SDnodeMgmt *pMgmt = pInfo->ahandle;
194,228✔
531
  int32_t     code = -1;
194,228✔
532
  STraceId   *trace = &pMsg->info.traceId;
194,228✔
533
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
194,228!
534

535
  switch (pMsg->msgType) {
194,228!
536
    case TDMT_DND_CONFIG_DNODE:
656✔
537
      code = dmProcessConfigReq(pMgmt, pMsg);
656✔
538
      break;
656✔
539
    case TDMT_MND_AUTH_RSP:
×
540
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
541
      break;
×
542
    case TDMT_MND_GRANT_RSP:
×
543
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
544
      break;
×
545
    case TDMT_DND_CREATE_MNODE:
107✔
546
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
107✔
547
      break;
107✔
548
    case TDMT_DND_DROP_MNODE:
7✔
549
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
7✔
550
      break;
7✔
551
    case TDMT_DND_CREATE_QNODE:
494✔
552
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
494✔
553
      break;
494✔
554
    case TDMT_DND_DROP_QNODE:
12✔
555
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
12✔
556
      break;
12✔
557
    case TDMT_DND_CREATE_SNODE:
16✔
558
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
16✔
559
      break;
16✔
560
    case TDMT_DND_DROP_SNODE:
6✔
561
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
6✔
562
      break;
6✔
563
    case TDMT_DND_ALTER_MNODE_TYPE:
1,012✔
564
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
1,012✔
565
      break;
1,012✔
566
    case TDMT_DND_SERVER_STATUS:
3✔
567
      code = dmProcessServerRunStatus(pMgmt, pMsg);
3✔
568
      break;
3✔
569
    case TDMT_DND_SYSTABLE_RETRIEVE:
196✔
570
      code = dmProcessRetrieve(pMgmt, pMsg);
196✔
571
      break;
196✔
572
    case TDMT_MND_GRANT:
2,862✔
573
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
2,862✔
574
      break;
2,862✔
575
    case TDMT_MND_GRANT_NOTIFY:
188,856✔
576
      code = dmProcessGrantNotify(NULL, pMsg);
188,856✔
577
      break;
188,856✔
578
    case TDMT_DND_CREATE_ENCRYPT_KEY:
1✔
579
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
1✔
580
      break;
1✔
581
    default:
×
582
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
583
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
584
      break;
×
585
  }
586

587
  if (IsReq(pMsg)) {
194,228!
588
    if (code != 0 && terrno != 0) code = terrno;
194,228✔
589
    SRpcMsg rsp = {
194,228✔
590
        .code = code,
591
        .pCont = pMsg->info.rsp,
194,228✔
592
        .contLen = pMsg->info.rspLen,
194,228✔
593
        .info = pMsg->info,
594
    };
595

596
    code = rpcSendResponse(&rsp);
194,228✔
597
    if (code != 0) {
194,228!
598
      dError("failed to send response since %s", tstrerror(code));
×
599
    }
600
  }
601

602
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
194,228✔
603
  rpcFreeCont(pMsg->pCont);
194,228✔
604
  taosFreeQitem(pMsg);
194,228✔
605
}
194,228✔
606

607
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
2,660✔
608
  int32_t          code = 0;
2,660✔
609
  SSingleWorkerCfg cfg = {
2,660✔
610
      .min = 1,
611
      .max = 1,
612
      .name = "dnode-mgmt",
613
      .fp = (FItem)dmProcessMgmtQueue,
614
      .param = pMgmt,
615
  };
616
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
2,660!
617
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
618
    return code;
×
619
  }
620

621
  dDebug("dnode workers are initialized");
2,660✔
622
  return 0;
2,660✔
623
}
624

625
void dmStopWorker(SDnodeMgmt *pMgmt) {
2,659✔
626
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
2,659✔
627
  dDebug("dnode workers are closed");
2,659✔
628
}
2,659✔
629

630
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
194,228✔
631
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
194,228✔
632
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
194,228✔
633
  return taosWriteQitem(pWorker->queue, pMsg);
194,228✔
634
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc