• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.71
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20

21
static void *dmStatusThreadFp(void *param) {
8✔
22
  SDnodeMgmt *pMgmt = param;
8✔
23
  int64_t     lastTime = taosGetTimestampMs();
8✔
24
  setThreadName("dnode-status");
8✔
25

26
  int32_t upTimeCount = 0;
8✔
27
  int64_t upTime = 0;
8✔
28

29
  while (1) {
116✔
30
    taosMsleep(200);
124✔
31
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
124!
32

33
    int64_t curTime = taosGetTimestampMs();
116✔
34
    if (curTime < lastTime) lastTime = curTime;
116!
35
    float interval = (curTime - lastTime) / 1000.0f;
116✔
36
    if (interval >= tsStatusInterval) {
116✔
37
      dmSendStatusReq(pMgmt);
20✔
38
      lastTime = curTime;
20✔
39

40
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
20!
UNCOV
41
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
×
UNCOV
42
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
×
43
      }
44
    }
45
  }
46

47
  return NULL;
8✔
48
}
49

50
static void *dmConfigThreadFp(void *param) {
8✔
51
  SDnodeMgmt *pMgmt = param;
8✔
52
  int64_t     lastTime = taosGetTimestampMs();
8✔
53
  setThreadName("dnode-config");
8✔
54
  while (1) {
116✔
55
    taosMsleep(200);
124✔
56
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
124!
57

58
    int64_t curTime = taosGetTimestampMs();
116✔
59
    if (curTime < lastTime) lastTime = curTime;
116!
60
    float interval = (curTime - lastTime) / 1000.0f;
116✔
61
    if (interval >= tsStatusInterval) {
116✔
62
      dmSendConfigReq(pMgmt);
20✔
63
      lastTime = curTime;
20✔
64
    }
65
  }
66
  return NULL;
8✔
67
}
68

69
static void *dmStatusInfoThreadFp(void *param) {
8✔
70
  SDnodeMgmt *pMgmt = param;
16✔
71
  int64_t     lastTime = taosGetTimestampMs();
8✔
72
  setThreadName("dnode-status-info");
8✔
73

74
  int32_t upTimeCount = 0;
8✔
75
  int64_t upTime = 0;
8✔
76

77
  while (1) {
116✔
78
    taosMsleep(200);
124✔
79
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
124!
80

81
    int64_t curTime = taosGetTimestampMs();
116✔
82
    if (curTime < lastTime) lastTime = curTime;
116!
83
    float interval = (curTime - lastTime) / 1000.0f;
116✔
84
    if (interval >= tsStatusInterval) {
116✔
85
      dmUpdateStatusInfo(pMgmt);
20✔
86
      lastTime = curTime;
20✔
87

88
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
20!
UNCOV
89
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
×
UNCOV
90
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
×
91
      }
92
    }
93
  }
94

95
  return NULL;
8✔
96
}
97

98
#if defined(TD_ENTERPRISE)
99
SDmNotifyHandle dmNotifyHdl = {.state = 0};
100
#define TIMESERIES_STASH_NUM 5
101
static void *dmNotifyThreadFp(void *param) {
8✔
102
  SDnodeMgmt *pMgmt = param;
8✔
103
  int64_t     lastTime = taosGetTimestampMs();
8✔
104
  setThreadName("dnode-notify");
8✔
105

106
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
8!
UNCOV
107
    return NULL;
×
108
  }
109

110
  // calculate approximate timeSeries per second
111
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
112
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
113
  int64_t  approximateTimeSeries = 0;
8✔
114
  uint64_t nTotalNotify = 0;
8✔
115
  int32_t  head, tail = 0;
8✔
116

117
  bool       wait = true;
8✔
118
  int32_t    nDnode = 0;
8✔
119
  int64_t    lastNotify = 0;
8✔
120
  int64_t    lastFetchDnode = 0;
8✔
121
  SNotifyReq req = {0};
8✔
122
  while (1) {
8✔
123
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
16!
124
    if (wait) tsem_wait(&dmNotifyHdl.sem);
8!
125
    atomic_store_8(&dmNotifyHdl.state, 1);
8✔
126

127
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
8✔
128
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
8!
129
      goto _skip;
8✔
130
    }
131
    int64_t current = taosGetTimestampMs();
×
132
    if (current - lastFetchDnode > 1000) {
×
133
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
134
      if (nDnode < 1) nDnode = 1;
×
UNCOV
135
      lastFetchDnode = current;
×
136
    }
137
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
138
      req.dnodeId = pMgmt->pData->dnodeId;
×
UNCOV
139
      req.clusterId = pMgmt->pData->clusterId;
×
140
    }
141

142
    if (current - lastNotify < 10) {
×
143
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
144
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
145
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
146
        taosMsleep(10);
×
147
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
UNCOV
148
        taosMsleep(5);
×
149
      } else {
UNCOV
150
        taosMsleep(2);
×
151
      }
152
    }
153

154
    SMonVloadInfo vinfo = {0};
×
155
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
156
    req.pVloads = vinfo.pVloads;
×
157
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
158
    int64_t nTimeSeries = 0;
×
159
    for (int32_t i = 0; i < nVgroup; ++i) {
×
160
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
UNCOV
161
      nTimeSeries += vload->nTimeSeries;
×
162
    }
163
    notifyTimeSeries[tail] = nTimeSeries;
×
164
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
UNCOV
165
    ++nTotalNotify;
×
166

167
    approximateTimeSeries = 0;
×
168
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
169
      head = tail - TIMESERIES_STASH_NUM + 1;
×
170
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
171
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
172
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
173
      if (tsDiff > 0) {
×
174
        if (timeDiff > 0 && timeDiff < 1e9) {
×
175
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
176
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
UNCOV
177
            dmSendNotifyReq(pMgmt, &req);
×
178
          }
179
        } else {
UNCOV
180
          dmSendNotifyReq(pMgmt, &req);
×
181
        }
182
      }
183
    } else {
UNCOV
184
      dmSendNotifyReq(pMgmt, &req);
×
185
    }
UNCOV
186
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
187

188
    tFreeSNotifyReq(&req);
×
UNCOV
189
    lastNotify = taosGetTimestampMs();
×
190
  _skip:
8✔
191
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
8!
192
      wait = true;
8✔
193
      continue;
8✔
194
    }
UNCOV
195
    wait = false;
×
196
  }
197

198
  return NULL;
8✔
199
}
200
#endif
201

202
#ifdef USE_MONITOR
203
static void *dmMonitorThreadFp(void *param) {
8✔
204
  SDnodeMgmt *pMgmt = param;
8✔
205
  int64_t     lastTime = taosGetTimestampMs();
8✔
206
  int64_t     lastTimeForBasic = taosGetTimestampMs();
8✔
207
  setThreadName("dnode-monitor");
8✔
208

209
  static int32_t TRIM_FREQ = 20;
210
  int32_t        trimCount = 0;
8✔
211

212
  while (1) {
116✔
213
    taosMsleep(200);
124✔
214
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
124!
215

216
    int64_t curTime = taosGetTimestampMs();
116✔
217

218
    if (curTime < lastTime) lastTime = curTime;
116!
219
    float interval = (curTime - lastTime) / 1000.0f;
116✔
220
    if (interval >= tsMonitorInterval) {
116!
UNCOV
221
      (*pMgmt->sendMonitorReportFp)();
×
UNCOV
222
      (*pMgmt->monitorCleanExpiredSamplesFp)();
×
UNCOV
223
      lastTime = curTime;
×
224

UNCOV
225
      trimCount = (trimCount + 1) % TRIM_FREQ;
×
UNCOV
226
      if (trimCount == 0) {
×
UNCOV
227
        taosMemoryTrim(0, NULL);
×
228
      }
229
    }
230
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
116!
UNCOV
231
      taosMemoryTrim(0, NULL);
×
232
    }
233
  }
234

235
  return NULL;
8✔
236
}
237
#endif
238
#ifdef USE_AUDIT
239
static void *dmAuditThreadFp(void *param) {
8✔
240
  SDnodeMgmt *pMgmt = param;
8✔
241
  int64_t     lastTime = taosGetTimestampMs();
8✔
242
  setThreadName("dnode-audit");
8✔
243

244
  while (1) {
234✔
245
    taosMsleep(100);
242✔
246
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
242!
247

248
    int64_t curTime = taosGetTimestampMs();
234✔
249
    if (curTime < lastTime) lastTime = curTime;
234!
250
    float interval = curTime - lastTime;
234✔
251
    if (interval >= tsAuditInterval) {
234✔
252
      (*pMgmt->sendAuditRecordsFp)();
2✔
253
      lastTime = curTime;
2✔
254
    }
255
  }
256

257
  return NULL;
8✔
258
}
259
#endif
260
#ifdef USE_REPORT
261
static void *dmCrashReportThreadFp(void *param) {
×
262
  int32_t     code = 0;
×
263
  SDnodeMgmt *pMgmt = param;
×
264
  int64_t     lastTime = taosGetTimestampMs();
×
265
  setThreadName("dnode-crashReport");
×
266
  char filepath[PATH_MAX] = {0};
×
267
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
268
  char     *pMsg = NULL;
×
269
  int64_t   msgLen = 0;
×
UNCOV
270
  TdFilePtr pFile = NULL;
×
271
  bool      truncateFile = false;
×
272
  int32_t   sleepTime = 200;
×
273
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
274
  int32_t   loopTimes = reportPeriodNum;
×
275

UNCOV
276
  STelemAddrMgmt mgt = {0};
×
277
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
278
  if (code != 0) {
×
279
    dError("failed to init telemetry since %s", tstrerror(code));
×
280
    return NULL;
×
281
  }
UNCOV
282
  code = initCrashLogWriter();
×
UNCOV
283
  if (code != 0) {
×
284
    dError("failed to init crash log writer since %s", tstrerror(code));
×
285
    return NULL;
×
286
  }
287

288
  while (1) {
289
    checkAndPrepareCrashInfo();
×
290
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
291
      break;
×
292
    }
293
    if (loopTimes++ < reportPeriodNum) {
×
294
      taosMsleep(sleepTime);
×
295
      if(loopTimes < 0) loopTimes = reportPeriodNum;
×
296
      continue;
×
297
    }
298
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
299
    if (pMsg && msgLen > 0) {
×
UNCOV
300
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
301
        dError("failed to send crash report");
×
302
        if (pFile) {
×
303
          taosReleaseCrashLogFile(pFile, false);
×
UNCOV
304
          pFile = NULL;
×
305

306
          taosMsleep(sleepTime);
×
307
          loopTimes = 0;
×
UNCOV
308
          continue;
×
309
        }
310
      } else {
UNCOV
311
        dInfo("succeed to send crash report");
×
UNCOV
312
        truncateFile = true;
×
313
      }
314
    } else {
315
      dInfo("no crash info was found");
×
316
    }
317

UNCOV
318
    taosMemoryFree(pMsg);
×
319

320
    if (pMsg && msgLen > 0) {
×
321
      pMsg = NULL;
×
322
      continue;
×
323
    }
324

UNCOV
325
    if (pFile) {
×
326
      taosReleaseCrashLogFile(pFile, truncateFile);
×
327
      pFile = NULL;
×
UNCOV
328
      truncateFile = false;
×
329
    }
330

331
    taosMsleep(sleepTime);
×
UNCOV
332
    loopTimes = 0;
×
333
  }
UNCOV
334
  taosTelemetryDestroy(&mgt);
×
335

UNCOV
336
  return NULL;
×
337
}
338
#endif
339

340
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
8✔
341
  int32_t      code = 0;
8✔
342
  TdThreadAttr thAttr;
343
  (void)taosThreadAttrInit(&thAttr);
8✔
344
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
8✔
345
#ifdef TD_COMPACT_OS
346
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
347
#endif
348
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
8!
UNCOV
349
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
350
    dError("failed to create status thread since %s", tstrerror(code));
×
UNCOV
351
    return code;
×
352
  }
353

354
  (void)taosThreadAttrDestroy(&thAttr);
8✔
355
  tmsgReportStartup("dnode-status", "initialized");
8✔
356
  return 0;
8✔
357
}
358

359
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
8✔
360
  int32_t      code = 0;
8✔
361
  TdThreadAttr thAttr;
362
  (void)taosThreadAttrInit(&thAttr);
8✔
363
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
8✔
364
#ifdef TD_COMPACT_OS
365
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
366
#endif
367
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
8!
UNCOV
368
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
369
    dError("failed to create config thread since %s", tstrerror(code));
×
UNCOV
370
    return code;
×
371
  }
372

373
  (void)taosThreadAttrDestroy(&thAttr);
8✔
374
  tmsgReportStartup("config-status", "initialized");
8✔
375
  return 0;
8✔
376
}
377

378
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
8✔
379
  int32_t      code = 0;
8✔
380
  TdThreadAttr thAttr;
381
  (void)taosThreadAttrInit(&thAttr);
8✔
382
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
8✔
383
#ifdef TD_COMPACT_OS
384
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
385
#endif
386
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
8!
UNCOV
387
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
388
    dError("failed to create status Info thread since %s", tstrerror(code));
×
UNCOV
389
    return code;
×
390
  }
391

392
  (void)taosThreadAttrDestroy(&thAttr);
8✔
393
  tmsgReportStartup("dnode-status-info", "initialized");
8✔
394
  return 0;
8✔
395
}
396

397
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
8✔
398
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
8!
399
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
8✔
400
    taosThreadClear(&pMgmt->statusThread);
8✔
401
  }
402
}
8✔
403

404
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
8✔
405
  if (taosCheckPthreadValid(pMgmt->configThread)) {
8!
406
    (void)taosThreadJoin(pMgmt->configThread, NULL);
8✔
407
    taosThreadClear(&pMgmt->configThread);
8✔
408
  }
409
}
8✔
410

411
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
8✔
412
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
8!
413
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
8✔
414
    taosThreadClear(&pMgmt->statusInfoThread);
8✔
415
  }
416
}
8✔
417
#ifdef TD_ENTERPRISE
418
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
8✔
419
  int32_t      code = 0;
8✔
420
  TdThreadAttr thAttr;
421
  (void)taosThreadAttrInit(&thAttr);
8✔
422
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
8✔
423
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
8!
UNCOV
424
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
425
    dError("failed to create notify thread since %s", tstrerror(code));
×
UNCOV
426
    return code;
×
427
  }
428

429
  (void)taosThreadAttrDestroy(&thAttr);
8✔
430
  tmsgReportStartup("dnode-notify", "initialized");
8✔
431
  return 0;
8✔
432
}
433

434
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
8✔
435
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
8!
436
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
8!
UNCOV
437
      dError("failed to post notify sem");
×
438
    }
439

440
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
8✔
441
    taosThreadClear(&pMgmt->notifyThread);
8✔
442
  }
443
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
8!
UNCOV
444
    dError("failed to destroy notify sem");
×
445
  }
446
}
8✔
447
#endif
448
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
8✔
449
  int32_t      code = 0;
8✔
450
#ifdef USE_MONITOR
451
  TdThreadAttr thAttr;
452
  (void)taosThreadAttrInit(&thAttr);
8✔
453
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
8✔
454
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
8!
455
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
456
    dError("failed to create monitor thread since %s", tstrerror(code));
×
457
    return code;
×
458
  }
459

460
  (void)taosThreadAttrDestroy(&thAttr);
8✔
461
  tmsgReportStartup("dnode-monitor", "initialized");
8✔
462
#endif
463
  return 0;
8✔
464
}
465

466
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
8✔
467
  int32_t      code = 0;
8✔
468
#ifdef USE_AUDIT  
469
  TdThreadAttr thAttr;
470
  (void)taosThreadAttrInit(&thAttr);
8✔
471
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
8✔
472
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
8!
UNCOV
473
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
474
    dError("failed to create audit thread since %s", tstrerror(code));
×
UNCOV
475
    return code;
×
476
  }
477

478
  (void)taosThreadAttrDestroy(&thAttr);
8✔
479
  tmsgReportStartup("dnode-audit", "initialized");
8✔
480
#endif  
481
  return 0;
8✔
482
}
483

484
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
8✔
485
#ifdef USE_MONITOR
486
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
8!
487
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
8✔
488
    taosThreadClear(&pMgmt->monitorThread);
8✔
489
  }
490
#endif
491
}
8✔
492

493
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
8✔
494
#ifdef USE_AUDIT
495
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
8!
496
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
8✔
497
    taosThreadClear(&pMgmt->auditThread);
8✔
498
  }
499
#endif
500
}
8✔
501

502
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
8✔
503
  int32_t code = 0;
8✔
504
#ifdef USE_REPORT
505
  if (!tsEnableCrashReport) {
8!
506
    return 0;
8✔
507
  }
508

509
  TdThreadAttr thAttr;
UNCOV
510
  (void)taosThreadAttrInit(&thAttr);
×
UNCOV
511
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
512
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
UNCOV
513
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
514
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
UNCOV
515
    return code;
×
516
  }
517

UNCOV
518
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
519
  tmsgReportStartup("dnode-crashReport", "initialized");
×
520
#endif
521
  return 0;
×
522
}
523

524
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
8✔
525
#ifdef USE_REPORT
526
  if (!tsEnableCrashReport) {
8!
527
    return;
8✔
528
  }
529

UNCOV
530
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
UNCOV
531
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
UNCOV
532
    taosThreadClear(&pMgmt->crashReportThread);
×
533
  }
534
#endif
535
}
536

537
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
8✔
538
  SDnodeMgmt *pMgmt = pInfo->ahandle;
8✔
539
  int32_t     code = -1;
8✔
540
  STraceId   *trace = &pMsg->info.traceId;
8✔
541
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
8!
542

543
  switch (pMsg->msgType) {
8!
UNCOV
544
    case TDMT_DND_CONFIG_DNODE:
×
UNCOV
545
      code = dmProcessConfigReq(pMgmt, pMsg);
×
UNCOV
546
      break;
×
UNCOV
547
    case TDMT_MND_AUTH_RSP:
×
UNCOV
548
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
UNCOV
549
      break;
×
UNCOV
550
    case TDMT_MND_GRANT_RSP:
×
UNCOV
551
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
UNCOV
552
      break;
×
UNCOV
553
    case TDMT_DND_CREATE_MNODE:
×
UNCOV
554
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
×
UNCOV
555
      break;
×
UNCOV
556
    case TDMT_DND_DROP_MNODE:
×
UNCOV
557
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
×
UNCOV
558
      break;
×
559
    case TDMT_DND_CREATE_QNODE:
5✔
560
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
5✔
561
      break;
5✔
562
    case TDMT_DND_DROP_QNODE:
3✔
563
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
3✔
564
      break;
3✔
565
    case TDMT_DND_CREATE_SNODE:
×
UNCOV
566
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
×
UNCOV
567
      break;
×
UNCOV
568
    case TDMT_DND_DROP_SNODE:
×
UNCOV
569
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
×
UNCOV
570
      break;
×
UNCOV
571
    case TDMT_DND_ALTER_MNODE_TYPE:
×
UNCOV
572
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
×
UNCOV
573
      break;
×
UNCOV
574
    case TDMT_DND_SERVER_STATUS:
×
UNCOV
575
      code = dmProcessServerRunStatus(pMgmt, pMsg);
×
UNCOV
576
      break;
×
UNCOV
577
    case TDMT_DND_SYSTABLE_RETRIEVE:
×
UNCOV
578
      code = dmProcessRetrieve(pMgmt, pMsg);
×
579
      break;
×
UNCOV
580
    case TDMT_MND_GRANT:
×
UNCOV
581
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
×
UNCOV
582
      break;
×
UNCOV
583
    case TDMT_MND_GRANT_NOTIFY:
×
UNCOV
584
      code = dmProcessGrantNotify(NULL, pMsg);
×
UNCOV
585
      break;
×
UNCOV
586
    case TDMT_DND_CREATE_ENCRYPT_KEY:
×
UNCOV
587
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
×
UNCOV
588
      break;
×
UNCOV
589
    default:
×
UNCOV
590
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
UNCOV
591
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
UNCOV
592
      break;
×
593
  }
594

595
  if (IsReq(pMsg)) {
8!
596
    if (code != 0 && terrno != 0) code = terrno;
8✔
597
    SRpcMsg rsp = {
8✔
598
        .code = code,
599
        .pCont = pMsg->info.rsp,
8✔
600
        .contLen = pMsg->info.rspLen,
8✔
601
        .info = pMsg->info,
602
    };
603

604
    code = rpcSendResponse(&rsp);
8✔
605
    if (code != 0) {
8!
UNCOV
606
      dError("failed to send response since %s", tstrerror(code));
×
607
    }
608
  }
609

610
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
8!
611
  rpcFreeCont(pMsg->pCont);
8✔
612
  taosFreeQitem(pMsg);
8✔
613
}
8✔
614

615
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
8✔
616
  int32_t          code = 0;
8✔
617
  SSingleWorkerCfg cfg = {
8✔
618
      .min = 1,
619
      .max = 1,
620
      .name = "dnode-mgmt",
621
      .fp = (FItem)dmProcessMgmtQueue,
622
      .param = pMgmt,
623
  };
624
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
8!
UNCOV
625
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
UNCOV
626
    return code;
×
627
  }
628

629
  dDebug("dnode workers are initialized");
8!
630
  return 0;
8✔
631
}
632

633
void dmStopWorker(SDnodeMgmt *pMgmt) {
8✔
634
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
8✔
635
  dDebug("dnode workers are closed");
8!
636
}
8✔
637

638
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8✔
639
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
8✔
640
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
8!
641
  return taosWriteQitem(pWorker->queue, pMsg);
8✔
642
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc