• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4404

30 Jun 2025 02:45AM UTC coverage: 62.241% (-0.4%) from 62.635%
#4404

push

travis-ci

web-flow
Merge pull request #31480 from taosdata/docs/3.0/TD-34215

add stmt2 docs

153837 of 315978 branches covered (48.69%)

Branch coverage included in aggregate %.

238272 of 314005 relevant lines covered (75.88%)

6134648.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.81
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20

21
static void *dmStatusThreadFp(void *param) {
2,666✔
22
  SDnodeMgmt *pMgmt = param;
2,666✔
23
  int64_t     lastTime = taosGetTimestampMs();
2,666✔
24
  setThreadName("dnode-status");
2,666✔
25

26
  while (1) {
475,208✔
27
    taosMsleep(200);
477,874✔
28
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
477,874!
29

30
    int64_t curTime = taosGetTimestampMs();
475,208✔
31
    if (curTime < lastTime) lastTime = curTime;
475,208!
32
    float interval = (curTime - lastTime) / 1000.0f;
475,208✔
33
    if (interval >= tsStatusInterval) {
475,208✔
34
      dmSendStatusReq(pMgmt);
94,885✔
35
      lastTime = curTime;
94,885✔
36
    }
37
  }
38

39
  return NULL;
2,666✔
40
}
41

42
static void *dmConfigThreadFp(void *param) {
2,666✔
43
  SDnodeMgmt *pMgmt = param;
2,666✔
44
  int64_t     lastTime = taosGetTimestampMs();
2,666✔
45
  setThreadName("dnode-config");
2,666✔
46
  while (1) {
13,432✔
47
    taosMsleep(200);
16,098✔
48
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
16,098!
49

50
    int64_t curTime = taosGetTimestampMs();
13,432✔
51
    if (curTime < lastTime) lastTime = curTime;
13,432!
52
    float interval = (curTime - lastTime) / 1000.0f;
13,432✔
53
    if (interval >= tsStatusInterval) {
13,432✔
54
      dmSendConfigReq(pMgmt);
2,731✔
55
      lastTime = curTime;
2,731✔
56
    }
57
  }
58
  return NULL;
2,666✔
59
}
60

61
static void *dmStatusInfoThreadFp(void *param) {
2,666✔
62
  SDnodeMgmt *pMgmt = param;
2,666✔
63
  int64_t     lastTime = taosGetTimestampMs();
2,666✔
64
  setThreadName("dnode-status-info");
2,666✔
65

66
  int32_t upTimeCount = 0;
2,666✔
67
  int64_t upTime = 0;
2,666✔
68

69
  while (1) {
496,408✔
70
    taosMsleep(200);
499,074✔
71
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
499,074!
72

73
    int64_t curTime = taosGetTimestampMs();
496,408✔
74
    if (curTime < lastTime) lastTime = curTime;
496,408!
75
    float interval = (curTime - lastTime) / 1000.0f;
496,408✔
76
    if (interval >= tsStatusInterval) {
496,408✔
77
      dmUpdateStatusInfo(pMgmt);
98,419✔
78
      lastTime = curTime;
98,419✔
79

80
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
98,419✔
81
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
787✔
82
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
787✔
83
      }
84
    }
85
  }
86

87
  return NULL;
2,666✔
88
}
89

90
#if defined(TD_ENTERPRISE)
91
SDmNotifyHandle dmNotifyHdl = {.state = 0};
92
#define TIMESERIES_STASH_NUM 5
93
static void *dmNotifyThreadFp(void *param) {
2,666✔
94
  SDnodeMgmt *pMgmt = param;
2,666✔
95
  int64_t     lastTime = taosGetTimestampMs();
2,666✔
96
  setThreadName("dnode-notify");
2,666✔
97

98
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
2,666!
99
    return NULL;
×
100
  }
101

102
  // calculate approximate timeSeries per second
103
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
104
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
105
  int64_t  approximateTimeSeries = 0;
2,666✔
106
  uint64_t nTotalNotify = 0;
2,666✔
107
  int32_t  head, tail = 0;
2,666✔
108

109
  bool       wait = true;
2,666✔
110
  int32_t    nDnode = 0;
2,666✔
111
  int64_t    lastNotify = 0;
2,666✔
112
  int64_t    lastFetchDnode = 0;
2,666✔
113
  SNotifyReq req = {0};
2,666✔
114
  while (1) {
86,664✔
115
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
89,330!
116
    if (wait) tsem_wait(&dmNotifyHdl.sem);
86,664✔
117
    atomic_store_8(&dmNotifyHdl.state, 1);
86,664✔
118

119
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
86,664✔
120
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
86,664!
121
      goto _skip;
86,659✔
122
    }
123
    int64_t current = taosGetTimestampMs();
5✔
124
    if (current - lastFetchDnode > 1000) {
5!
125
      nDnode = dmGetDnodeSize(pMgmt->pData);
5✔
126
      if (nDnode < 1) nDnode = 1;
5!
127
      lastFetchDnode = current;
5✔
128
    }
129
    if (req.dnodeId == 0 || req.clusterId == 0) {
5!
130
      req.dnodeId = pMgmt->pData->dnodeId;
5✔
131
      req.clusterId = pMgmt->pData->clusterId;
5✔
132
    }
133

134
    if (current - lastNotify < 10) {
5!
135
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
136
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
137
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
138
        taosMsleep(10);
×
139
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
140
        taosMsleep(5);
×
141
      } else {
142
        taosMsleep(2);
×
143
      }
144
    }
145

146
    SMonVloadInfo vinfo = {0};
5✔
147
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
5✔
148
    req.pVloads = vinfo.pVloads;
5✔
149
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
5✔
150
    int64_t nTimeSeries = 0;
5✔
151
    for (int32_t i = 0; i < nVgroup; ++i) {
5!
152
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
153
      nTimeSeries += vload->nTimeSeries;
×
154
    }
155
    notifyTimeSeries[tail] = nTimeSeries;
5✔
156
    notifyTimeStamp[tail] = taosGetTimestampNs();
5✔
157
    ++nTotalNotify;
5✔
158

159
    approximateTimeSeries = 0;
5✔
160
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
5!
161
      head = tail - TIMESERIES_STASH_NUM + 1;
×
162
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
163
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
164
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
165
      if (tsDiff > 0) {
×
166
        if (timeDiff > 0 && timeDiff < 1e9) {
×
167
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
168
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
169
            dmSendNotifyReq(pMgmt, &req);
×
170
          }
171
        } else {
172
          dmSendNotifyReq(pMgmt, &req);
×
173
        }
174
      }
175
    } else {
176
      dmSendNotifyReq(pMgmt, &req);
5✔
177
    }
178
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
5!
179

180
    tFreeSNotifyReq(&req);
5✔
181
    lastNotify = taosGetTimestampMs();
5✔
182
  _skip:
86,664✔
183
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
86,664✔
184
      wait = true;
86,652✔
185
      continue;
86,652✔
186
    }
187
    wait = false;
12✔
188
  }
189

190
  return NULL;
2,666✔
191
}
192
#endif
193

194
#ifdef USE_MONITOR
195
static void *dmMonitorThreadFp(void *param) {
2,666✔
196
  SDnodeMgmt *pMgmt = param;
2,666✔
197
  int64_t     lastTime = taosGetTimestampMs();
2,666✔
198
  int64_t     lastTimeForBasic = taosGetTimestampMs();
2,666✔
199
  setThreadName("dnode-monitor");
2,666✔
200

201
  static int32_t TRIM_FREQ = 20;
202
  int32_t        trimCount = 0;
2,666✔
203

204
  while (1) {
496,625✔
205
    taosMsleep(200);
499,291✔
206
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
499,291!
207

208
    int64_t curTime = taosGetTimestampMs();
496,625✔
209

210
    if (curTime < lastTime) lastTime = curTime;
496,625!
211
    float interval = (curTime - lastTime) / 1000.0f;
496,625✔
212
    if (interval >= tsMonitorInterval) {
496,625✔
213
      (*pMgmt->sendMonitorReportFp)();
2,352✔
214
      (*pMgmt->monitorCleanExpiredSamplesFp)();
2,352✔
215
      lastTime = curTime;
2,352✔
216

217
      trimCount = (trimCount + 1) % TRIM_FREQ;
2,352✔
218
      if (trimCount == 0) {
2,352✔
219
        taosMemoryTrim(0, NULL);
1!
220
      }
221
    }
222
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
496,625✔
223
      taosMemoryTrim(0, NULL);
18,150!
224
    }
225
  }
226

227
  return NULL;
2,666✔
228
}
229
#endif
230
#ifdef USE_AUDIT
231
static void *dmAuditThreadFp(void *param) {
2,666✔
232
  SDnodeMgmt *pMgmt = param;
2,666✔
233
  int64_t     lastTime = taosGetTimestampMs();
2,666✔
234
  setThreadName("dnode-audit");
2,666✔
235

236
  while (1) {
994,307✔
237
    taosMsleep(100);
996,973✔
238
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
996,973!
239

240
    int64_t curTime = taosGetTimestampMs();
994,307✔
241
    if (curTime < lastTime) lastTime = curTime;
994,307!
242
    float interval = curTime - lastTime;
994,307✔
243
    if (interval >= tsAuditInterval) {
994,307✔
244
      (*pMgmt->sendAuditRecordsFp)();
18,598✔
245
      lastTime = curTime;
18,598✔
246
    }
247
  }
248

249
  return NULL;
2,666✔
250
}
251
#endif
252
#ifdef USE_REPORT
253
static void *dmCrashReportThreadFp(void *param) {
×
254
  int32_t     code = 0;
×
255
  SDnodeMgmt *pMgmt = param;
×
256
  int64_t     lastTime = taosGetTimestampMs();
×
257
  setThreadName("dnode-crashReport");
×
258
  char filepath[PATH_MAX] = {0};
×
259
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
260
  char     *pMsg = NULL;
×
261
  int64_t   msgLen = 0;
×
262
  TdFilePtr pFile = NULL;
×
263
  bool      truncateFile = false;
×
264
  int32_t   sleepTime = 200;
×
265
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
266
  int32_t   loopTimes = reportPeriodNum;
×
267

268
  STelemAddrMgmt mgt = {0};
×
269
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
270
  if (code != 0) {
×
271
    dError("failed to init telemetry since %s", tstrerror(code));
×
272
    return NULL;
×
273
  }
274
  code = initCrashLogWriter();
×
275
  if (code != 0) {
×
276
    dError("failed to init crash log writer since %s", tstrerror(code));
×
277
    return NULL;
×
278
  }
279

280
  while (1) {
281
    checkAndPrepareCrashInfo();
×
282
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
283
      break;
×
284
    }
285
    if (loopTimes++ < reportPeriodNum) {
×
286
      taosMsleep(sleepTime);
×
287
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
288
      continue;
×
289
    }
290
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
291
    if (pMsg && msgLen > 0) {
×
292
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
293
        dError("failed to send crash report");
×
294
        if (pFile) {
×
295
          taosReleaseCrashLogFile(pFile, false);
×
296
          pFile = NULL;
×
297

298
          taosMsleep(sleepTime);
×
299
          loopTimes = 0;
×
300
          continue;
×
301
        }
302
      } else {
303
        dInfo("succeed to send crash report");
×
304
        truncateFile = true;
×
305
      }
306
    } else {
307
      dInfo("no crash info was found");
×
308
    }
309

310
    taosMemoryFree(pMsg);
×
311

312
    if (pMsg && msgLen > 0) {
×
313
      pMsg = NULL;
×
314
      continue;
×
315
    }
316

317
    if (pFile) {
×
318
      taosReleaseCrashLogFile(pFile, truncateFile);
×
319
      pFile = NULL;
×
320
      truncateFile = false;
×
321
    }
322

323
    taosMsleep(sleepTime);
×
324
    loopTimes = 0;
×
325
  }
326
  taosTelemetryDestroy(&mgt);
×
327

328
  return NULL;
×
329
}
330
#endif
331

332
static void *dmMetricsThreadFp(void *param) {
2,666✔
333
  SDnodeMgmt *pMgmt = param;
2,666✔
334
  int64_t     lastTime = taosGetTimestampMs();
2,666✔
335
  setThreadName("dnode-metrics");
2,666✔
336
  while (1) {
496,929✔
337
    taosMsleep(200);
499,595✔
338
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
499,595!
339

340
    int64_t curTime = taosGetTimestampMs();
496,929✔
341
    if (curTime < lastTime) lastTime = curTime;
496,929!
342
    float interval = (curTime - lastTime) / 1000.0f;
496,929✔
343
    if (interval >= tsMetricsInterval) {
496,929✔
344
      (*pMgmt->sendMetricsReportFp)();
2,330✔
345
      (*pMgmt->metricsCleanExpiredSamplesFp)();
2,330✔
346
      lastTime = curTime;
2,330✔
347
    }
348
  }
349
  return NULL;
2,666✔
350
}
351

352
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
2,666✔
353
  int32_t      code = 0;
2,666✔
354
  TdThreadAttr thAttr;
355
  (void)taosThreadAttrInit(&thAttr);
2,666✔
356
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,666✔
357
#ifdef TD_COMPACT_OS
358
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
359
#endif
360
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
2,666!
361
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
362
    dError("failed to create status thread since %s", tstrerror(code));
×
363
    return code;
×
364
  }
365

366
  (void)taosThreadAttrDestroy(&thAttr);
2,666✔
367
  tmsgReportStartup("dnode-status", "initialized");
2,666✔
368
  return 0;
2,666✔
369
}
370

371
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
2,666✔
372
  int32_t      code = 0;
2,666✔
373
  TdThreadAttr thAttr;
374
  (void)taosThreadAttrInit(&thAttr);
2,666✔
375
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,666✔
376
#ifdef TD_COMPACT_OS
377
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
378
#endif
379
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
2,666!
380
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
381
    dError("failed to create config thread since %s", tstrerror(code));
×
382
    return code;
×
383
  }
384

385
  (void)taosThreadAttrDestroy(&thAttr);
2,666✔
386
  tmsgReportStartup("config-status", "initialized");
2,666✔
387
  return 0;
2,666✔
388
}
389

390
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
2,666✔
391
  int32_t      code = 0;
2,666✔
392
  TdThreadAttr thAttr;
393
  (void)taosThreadAttrInit(&thAttr);
2,666✔
394
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,666✔
395
#ifdef TD_COMPACT_OS
396
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
397
#endif
398
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
2,666!
399
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
400
    dError("failed to create status Info thread since %s", tstrerror(code));
×
401
    return code;
×
402
  }
403

404
  (void)taosThreadAttrDestroy(&thAttr);
2,666✔
405
  tmsgReportStartup("dnode-status-info", "initialized");
2,666✔
406
  return 0;
2,666✔
407
}
408

409
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
2,666✔
410
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
2,666!
411
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
2,666✔
412
    taosThreadClear(&pMgmt->statusThread);
2,666✔
413
  }
414
}
2,666✔
415

416
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
2,666✔
417
  if (taosCheckPthreadValid(pMgmt->configThread)) {
2,666!
418
    (void)taosThreadJoin(pMgmt->configThread, NULL);
2,666✔
419
    taosThreadClear(&pMgmt->configThread);
2,666✔
420
  }
421
}
2,666✔
422

423
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
2,666✔
424
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
2,666!
425
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
2,666✔
426
    taosThreadClear(&pMgmt->statusInfoThread);
2,666✔
427
  }
428
}
2,666✔
429
#ifdef TD_ENTERPRISE
430
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
2,666✔
431
  int32_t      code = 0;
2,666✔
432
  TdThreadAttr thAttr;
433
  (void)taosThreadAttrInit(&thAttr);
2,666✔
434
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,666✔
435
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
2,666!
436
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
437
    dError("failed to create notify thread since %s", tstrerror(code));
×
438
    return code;
×
439
  }
440

441
  (void)taosThreadAttrDestroy(&thAttr);
2,666✔
442
  tmsgReportStartup("dnode-notify", "initialized");
2,666✔
443
  return 0;
2,666✔
444
}
445

446
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
2,666✔
447
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
2,666!
448
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
2,666!
449
      dError("failed to post notify sem");
×
450
    }
451

452
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
2,666✔
453
    taosThreadClear(&pMgmt->notifyThread);
2,666✔
454
  }
455
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
2,666!
456
    dError("failed to destroy notify sem");
×
457
  }
458
}
2,666✔
459
#endif
460
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
2,666✔
461
  int32_t      code = 0;
2,666✔
462
#ifdef USE_MONITOR
463
  TdThreadAttr thAttr;
464
  (void)taosThreadAttrInit(&thAttr);
2,666✔
465
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,666✔
466
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
2,666!
467
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
468
    dError("failed to create monitor thread since %s", tstrerror(code));
×
469
    return code;
×
470
  }
471

472
  (void)taosThreadAttrDestroy(&thAttr);
2,666✔
473
  tmsgReportStartup("dnode-monitor", "initialized");
2,666✔
474
#endif
475
  return 0;
2,666✔
476
}
477

478
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
2,666✔
479
  int32_t      code = 0;
2,666✔
480
#ifdef USE_AUDIT  
481
  TdThreadAttr thAttr;
482
  (void)taosThreadAttrInit(&thAttr);
2,666✔
483
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,666✔
484
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
2,666!
485
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
486
    dError("failed to create audit thread since %s", tstrerror(code));
×
487
    return code;
×
488
  }
489

490
  (void)taosThreadAttrDestroy(&thAttr);
2,666✔
491
  tmsgReportStartup("dnode-audit", "initialized");
2,666✔
492
#endif  
493
  return 0;
2,666✔
494
}
495

496
int32_t dmStartMetricsThread(SDnodeMgmt *pMgmt) {
2,666✔
497
  int32_t code = 0;
2,666✔
498
#ifdef USE_MONITOR
499
  TdThreadAttr thAttr;
500
  (void)taosThreadAttrInit(&thAttr);
2,666✔
501
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,666✔
502
  if (taosThreadCreate(&pMgmt->metricsThread, &thAttr, dmMetricsThreadFp, pMgmt) != 0) {
2,666!
503
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
504
    dError("failed to create metrics thread since %s", tstrerror(code));
×
505
    return code;
×
506
  }
507

508
  (void)taosThreadAttrDestroy(&thAttr);
2,666✔
509
  tmsgReportStartup("dnode-metrics", "initialized");
2,666✔
510
#endif
511
  return 0;
2,666✔
512
}
513

514
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
2,666✔
515
#ifdef USE_MONITOR
516
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
2,666!
517
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
2,666✔
518
    taosThreadClear(&pMgmt->monitorThread);
2,666✔
519
  }
520
#endif
521
}
2,666✔
522

523
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
2,666✔
524
#ifdef USE_AUDIT
525
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
2,666!
526
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
2,666✔
527
    taosThreadClear(&pMgmt->auditThread);
2,666✔
528
  }
529
#endif
530
}
2,666✔
531

532
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
2,666✔
533
  int32_t code = 0;
2,666✔
534
#ifdef USE_REPORT
535
  if (!tsEnableCrashReport) {
2,666!
536
    return 0;
2,666✔
537
  }
538

539
  TdThreadAttr thAttr;
540
  (void)taosThreadAttrInit(&thAttr);
×
541
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
542
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
543
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
544
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
545
    return code;
×
546
  }
547

548
  (void)taosThreadAttrDestroy(&thAttr);
×
549
  tmsgReportStartup("dnode-crashReport", "initialized");
×
550
#endif
551
  return 0;
×
552
}
553

554
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
2,666✔
555
#ifdef USE_REPORT
556
  if (!tsEnableCrashReport) {
2,666!
557
    return;
2,666✔
558
  }
559

560
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
561
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
562
    taosThreadClear(&pMgmt->crashReportThread);
×
563
  }
564
#endif
565
}
566

567
void dmStopMetricsThread(SDnodeMgmt *pMgmt) {
2,666✔
568
  if (taosCheckPthreadValid(pMgmt->metricsThread)) {
2,666!
569
    (void)taosThreadJoin(pMgmt->metricsThread, NULL);
2,666✔
570
    taosThreadClear(&pMgmt->metricsThread);
2,666✔
571
  }
572
}
2,666✔
573

574
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
186,782✔
575
  SDnodeMgmt *pMgmt = pInfo->ahandle;
186,782✔
576
  int32_t     code = -1;
186,782✔
577
  STraceId   *trace = &pMsg->info.traceId;
186,782✔
578
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
186,782!
579

580
  switch (pMsg->msgType) {
186,782!
581
    case TDMT_DND_CONFIG_DNODE:
538✔
582
      code = dmProcessConfigReq(pMgmt, pMsg);
538✔
583
      break;
538✔
584
    case TDMT_MND_AUTH_RSP:
×
585
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
586
      break;
×
587
    case TDMT_MND_GRANT_RSP:
×
588
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
589
      break;
×
590
    case TDMT_DND_CREATE_MNODE:
107✔
591
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
107✔
592
      break;
107✔
593
    case TDMT_DND_DROP_MNODE:
7✔
594
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
7✔
595
      break;
7✔
596
    case TDMT_DND_CREATE_QNODE:
492✔
597
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
492✔
598
      break;
492✔
599
    case TDMT_DND_DROP_QNODE:
12✔
600
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
12✔
601
      break;
12✔
602
    case TDMT_DND_CREATE_SNODE:
15✔
603
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
15✔
604
      break;
15✔
605
    case TDMT_DND_DROP_SNODE:
6✔
606
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
6✔
607
      break;
6✔
608
    case TDMT_DND_CREATE_BNODE:
×
609
      code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg);
×
610
      break;
×
611
    case TDMT_DND_DROP_BNODE:
×
612
      code = (*pMgmt->processDropNodeFp)(BNODE, pMsg);
×
613
      break;
×
614
    case TDMT_DND_ALTER_MNODE_TYPE:
848✔
615
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
848✔
616
      break;
848✔
617
    case TDMT_DND_SERVER_STATUS:
3✔
618
      code = dmProcessServerRunStatus(pMgmt, pMsg);
3✔
619
      break;
3✔
620
    case TDMT_DND_SYSTABLE_RETRIEVE:
196✔
621
      code = dmProcessRetrieve(pMgmt, pMsg);
196✔
622
      break;
196✔
623
    case TDMT_MND_GRANT:
2,858✔
624
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
2,858✔
625
      break;
2,858✔
626
    case TDMT_MND_GRANT_NOTIFY:
181,699✔
627
      code = dmProcessGrantNotify(NULL, pMsg);
181,699✔
628
      break;
181,699✔
629
    case TDMT_DND_CREATE_ENCRYPT_KEY:
1✔
630
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
1✔
631
      break;
1✔
632
    default:
×
633
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
634
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
635
      break;
×
636
  }
637

638
  if (IsReq(pMsg)) {
186,782!
639
    if (code != 0 && terrno != 0) code = terrno;
186,782✔
640
    SRpcMsg rsp = {
186,782✔
641
        .code = code,
642
        .pCont = pMsg->info.rsp,
186,782✔
643
        .contLen = pMsg->info.rspLen,
186,782✔
644
        .info = pMsg->info,
645
    };
646

647
    code = rpcSendResponse(&rsp);
186,782✔
648
    if (code != 0) {
186,782!
649
      dError("failed to send response since %s", tstrerror(code));
×
650
    }
651
  }
652

653
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
186,782✔
654
  rpcFreeCont(pMsg->pCont);
186,782✔
655
  taosFreeQitem(pMsg);
186,782✔
656
}
186,782✔
657

658
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
2,666✔
659
  int32_t          code = 0;
2,666✔
660
  SSingleWorkerCfg cfg = {
2,666✔
661
      .min = 1,
662
      .max = 1,
663
      .name = "dnode-mgmt",
664
      .fp = (FItem)dmProcessMgmtQueue,
665
      .param = pMgmt,
666
  };
667
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
2,666!
668
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
669
    return code;
×
670
  }
671

672
  dDebug("dnode workers are initialized");
2,666✔
673
  return 0;
2,666✔
674
}
675

676
void dmStopWorker(SDnodeMgmt *pMgmt) {
2,666✔
677
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
2,666✔
678
  dDebug("dnode workers are closed");
2,666✔
679
}
2,666✔
680

681
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
186,782✔
682
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
186,782✔
683
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
186,782✔
684
  return taosWriteQitem(pWorker->queue, pMsg);
186,782✔
685
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc