• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3563

21 Dec 2024 05:37AM UTC coverage: 61.045% (+34.4%) from 26.655%
#3563

push

travis-ci

web-flow
Merge pull request #29256 from taosdata/merge/mainto3.0

merge: from main t 3.0

135730 of 287838 branches covered (47.15%)

Branch coverage included in aggregate %.

9 of 28 new or added lines in 5 files covered. (32.14%)

784 existing lines in 21 files now uncovered.

213302 of 283921 relevant lines covered (75.13%)

9176355.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.95
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20

21
static void *dmStatusThreadFp(void *param) {
1,715✔
22
  SDnodeMgmt *pMgmt = param;
1,715✔
23
  int64_t     lastTime = taosGetTimestampMs();
1,715✔
24
  setThreadName("dnode-status");
1,715✔
25

26
  int32_t upTimeCount = 0;
1,715✔
27
  int64_t upTime = 0;
1,715✔
28

29
  while (1) {
383,678✔
30
    taosMsleep(200);
385,393✔
31
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
385,393!
32

33
    int64_t curTime = taosGetTimestampMs();
383,678✔
34
    if (curTime < lastTime) lastTime = curTime;
383,678!
35
    float interval = (curTime - lastTime) / 1000.0f;
383,678✔
36
    if (interval >= tsStatusInterval) {
383,678✔
37
      dmSendStatusReq(pMgmt);
76,389✔
38
      lastTime = curTime;
76,389✔
39

40
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
76,389✔
41
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
768✔
42
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
768✔
43
      }
44
    }
45
  }
46

47
  return NULL;
1,715✔
48
}
49

50
static void *dmConfigThreadFp(void *param) {
1,715✔
51
  SDnodeMgmt *pMgmt = param;
1,715✔
52
  int64_t     lastTime = taosGetTimestampMs();
1,715✔
53
  setThreadName("dnode-config");
1,715✔
54
  while (1) {
8,450✔
55
    taosMsleep(200);
10,165✔
56
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
10,165!
57

58
    int64_t curTime = taosGetTimestampMs();
8,450✔
59
    if (curTime < lastTime) lastTime = curTime;
8,450!
60
    float interval = (curTime - lastTime) / 1000.0f;
8,450✔
61
    if (interval >= tsStatusInterval) {
8,450✔
62
      dmSendConfigReq(pMgmt);
1,651✔
63
      lastTime = curTime;
1,651✔
64
    }
65
  }
66
  return NULL;
1,715✔
67
}
68

69
static void *dmStatusInfoThreadFp(void *param) {
1,715✔
70
  SDnodeMgmt *pMgmt = param;
3,430✔
71
  int64_t     lastTime = taosGetTimestampMs();
1,715✔
72
  setThreadName("dnode-status-info");
1,715✔
73

74
  int32_t upTimeCount = 0;
1,715✔
75
  int64_t upTime = 0;
1,715✔
76

77
  while (1) {
391,446✔
78
    taosMsleep(200);
393,161✔
79
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
393,161!
80

81
    int64_t curTime = taosGetTimestampMs();
391,446✔
82
    if (curTime < lastTime) lastTime = curTime;
391,446!
83
    float interval = (curTime - lastTime) / 1000.0f;
391,446✔
84
    if (interval >= tsStatusInterval) {
391,446✔
85
      dmUpdateStatusInfo(pMgmt);
77,630✔
86
      lastTime = curTime;
77,630✔
87

88
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
77,630✔
89
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
784✔
90
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
784✔
91
      }
92
    }
93
  }
94

95
  return NULL;
1,715✔
96
}
97

98
SDmNotifyHandle dmNotifyHdl = {.state = 0};
99
#define TIMESERIES_STASH_NUM 5
100
static void *dmNotifyThreadFp(void *param) {
1,715✔
101
  SDnodeMgmt *pMgmt = param;
1,715✔
102
  int64_t     lastTime = taosGetTimestampMs();
1,715✔
103
  setThreadName("dnode-notify");
1,715✔
104

105
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
1,715!
106
    return NULL;
×
107
  }
108

109
  // calculate approximate timeSeries per second
110
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
111
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
112
  int64_t  approximateTimeSeries = 0;
1,715✔
113
  uint64_t nTotalNotify = 0;
1,715✔
114
  int32_t  head, tail = 0;
1,715✔
115

116
  bool       wait = true;
1,715✔
117
  int32_t    nDnode = 0;
1,715✔
118
  int64_t    lastNotify = 0;
1,715✔
119
  int64_t    lastFetchDnode = 0;
1,715✔
120
  SNotifyReq req = {0};
1,715✔
121
  while (1) {
1,723✔
122
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
3,438!
123
    if (wait) tsem_wait(&dmNotifyHdl.sem);
1,723!
124
    atomic_store_8(&dmNotifyHdl.state, 1);
1,723✔
125

126
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
1,723✔
127
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
1,723!
128
      goto _skip;
1,723✔
129
    }
130
    int64_t current = taosGetTimestampMs();
×
131
    if (current - lastFetchDnode > 1000) {
×
132
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
133
      if (nDnode < 1) nDnode = 1;
×
134
      lastFetchDnode = current;
×
135
    }
136
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
137
      req.dnodeId = pMgmt->pData->dnodeId;
×
138
      req.clusterId = pMgmt->pData->clusterId;
×
139
    }
140

141
    if (current - lastNotify < 10) {
×
142
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
143
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
144
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
145
        taosMsleep(10);
×
146
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
147
        taosMsleep(5);
×
148
      } else {
149
        taosMsleep(2);
×
150
      }
151
    }
152

153
    SMonVloadInfo vinfo = {0};
×
154
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
155
    req.pVloads = vinfo.pVloads;
×
156
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
157
    int64_t nTimeSeries = 0;
×
158
    for (int32_t i = 0; i < nVgroup; ++i) {
×
159
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
160
      nTimeSeries += vload->nTimeSeries;
×
161
    }
162
    notifyTimeSeries[tail] = nTimeSeries;
×
163
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
164
    ++nTotalNotify;
×
165

166
    approximateTimeSeries = 0;
×
167
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
168
      head = tail - TIMESERIES_STASH_NUM + 1;
×
169
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
170
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
171
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
172
      if (tsDiff > 0) {
×
173
        if (timeDiff > 0 && timeDiff < 1e9) {
×
174
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
175
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
176
            dmSendNotifyReq(pMgmt, &req);
×
177
          }
178
        } else {
179
          dmSendNotifyReq(pMgmt, &req);
×
180
        }
181
      }
182
    } else {
183
      dmSendNotifyReq(pMgmt, &req);
×
184
    }
185
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
186

187
    tFreeSNotifyReq(&req);
×
188
    lastNotify = taosGetTimestampMs();
×
189
  _skip:
1,723✔
190
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
1,723!
191
      wait = true;
1,723✔
192
      continue;
1,723✔
193
    }
194
    wait = false;
×
195
  }
196

197
  return NULL;
1,715✔
198
}
199

200
static void *dmMonitorThreadFp(void *param) {
1,715✔
201
  SDnodeMgmt *pMgmt = param;
1,715✔
202
  int64_t     lastTime = taosGetTimestampMs();
1,715✔
203
  int64_t     lastTimeForBasic = taosGetTimestampMs();
1,715✔
204
  setThreadName("dnode-monitor");
1,715✔
205

206
  static int32_t TRIM_FREQ = 20;
207
  int32_t        trimCount = 0;
1,715✔
208

209
  while (1) {
391,141✔
210
    taosMsleep(200);
392,856✔
211
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
392,856!
212

213
    int64_t curTime = taosGetTimestampMs();
391,141✔
214

215
    if (curTime < lastTime) lastTime = curTime;
391,141!
216
    float interval = (curTime - lastTime) / 1000.0f;
391,141✔
217
    if (interval >= tsMonitorInterval) {
391,141✔
218
      (*pMgmt->sendMonitorReportFp)();
2,072✔
219
      (*pMgmt->monitorCleanExpiredSamplesFp)();
2,072✔
220
      lastTime = curTime;
2,072✔
221

222
      trimCount = (trimCount + 1) % TRIM_FREQ;
2,072✔
223
      if (trimCount == 0) {
2,072✔
224
        taosMemoryTrim(0, NULL);
11!
225
      }
226
    }
227
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
391,141✔
228
      taosMemoryTrim(0, NULL);
14,562!
229
    }
230
  }
231

232
  return NULL;
1,715✔
233
}
234

235
static void *dmAuditThreadFp(void *param) {
1,715✔
236
  SDnodeMgmt *pMgmt = param;
1,715✔
237
  int64_t     lastTime = taosGetTimestampMs();
1,715✔
238
  setThreadName("dnode-audit");
1,715✔
239

240
  while (1) {
783,063✔
241
    taosMsleep(100);
784,778✔
242
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
784,778!
243

244
    int64_t curTime = taosGetTimestampMs();
783,063✔
245
    if (curTime < lastTime) lastTime = curTime;
783,063!
246
    float interval = curTime - lastTime;
783,063✔
247
    if (interval >= tsAuditInterval) {
783,063✔
248
      (*pMgmt->sendAuditRecordsFp)();
14,871✔
249
      lastTime = curTime;
14,871✔
250
    }
251
  }
252

253
  return NULL;
1,715✔
254
}
255

256
static void *dmCrashReportThreadFp(void *param) {
×
257
  int32_t     code = 0;
×
258
  SDnodeMgmt *pMgmt = param;
×
259
  int64_t     lastTime = taosGetTimestampMs();
×
260
  setThreadName("dnode-crashReport");
×
261
  char filepath[PATH_MAX] = {0};
×
262
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
263
  char     *pMsg = NULL;
×
264
  int64_t   msgLen = 0;
×
265
  TdFilePtr pFile = NULL;
×
266
  bool      truncateFile = false;
×
267
  int32_t   sleepTime = 200;
×
268
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
269
  int32_t   loopTimes = reportPeriodNum;
×
270

271
  STelemAddrMgmt mgt = {0};
×
272
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
273
  if (code != 0) {
×
274
    dError("failed to init telemetry since %s", tstrerror(code));
×
275
    return NULL;
×
276
  }
277

278
  while (1) {
279
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
×
280
    if (loopTimes++ < reportPeriodNum) {
×
281
      taosMsleep(sleepTime);
×
282
      continue;
×
283
    }
284

285
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
286
    if (pMsg && msgLen > 0) {
×
287
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
288
        dError("failed to send crash report");
×
289
        if (pFile) {
×
290
          taosReleaseCrashLogFile(pFile, false);
×
291
          pFile = NULL;
×
292

293
          taosMsleep(sleepTime);
×
294
          loopTimes = 0;
×
295
          continue;
×
296
        }
297
      } else {
298
        dInfo("succeed to send crash report");
×
299
        truncateFile = true;
×
300
      }
301
    } else {
302
      dDebug("no crash info");
×
303
    }
304

305
    taosMemoryFree(pMsg);
×
306

307
    if (pMsg && msgLen > 0) {
×
308
      pMsg = NULL;
×
309
      continue;
×
310
    }
311

312
    if (pFile) {
×
313
      taosReleaseCrashLogFile(pFile, truncateFile);
×
314
      pFile = NULL;
×
315
      truncateFile = false;
×
316
    }
317

318
    taosMsleep(sleepTime);
×
319
    loopTimes = 0;
×
320
  }
321
  taosTelemetryDestroy(&mgt);
×
322

323
  return NULL;
×
324
}
325

326
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
1,715✔
327
  int32_t      code = 0;
1,715✔
328
  TdThreadAttr thAttr;
329
  (void)taosThreadAttrInit(&thAttr);
1,715✔
330
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,715✔
331
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
1,715!
332
    code = TAOS_SYSTEM_ERROR(errno);
×
333
    dError("failed to create status thread since %s", tstrerror(code));
×
334
    return code;
×
335
  }
336

337
  (void)taosThreadAttrDestroy(&thAttr);
1,715✔
338
  tmsgReportStartup("dnode-status", "initialized");
1,715✔
339
  return 0;
1,715✔
340
}
341

342
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
1,715✔
343
  int32_t      code = 0;
1,715✔
344
  TdThreadAttr thAttr;
345
  (void)taosThreadAttrInit(&thAttr);
1,715✔
346
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,715✔
347
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
1,715!
348
    code = TAOS_SYSTEM_ERROR(errno);
×
349
    dError("failed to create config thread since %s", tstrerror(code));
×
350
    return code;
×
351
  }
352

353
  (void)taosThreadAttrDestroy(&thAttr);
1,715✔
354
  tmsgReportStartup("config-status", "initialized");
1,715✔
355
  return 0;
1,715✔
356
}
357

358
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
1,715✔
359
  int32_t      code = 0;
1,715✔
360
  TdThreadAttr thAttr;
361
  (void)taosThreadAttrInit(&thAttr);
1,715✔
362
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,715✔
363
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
1,715!
364
    code = TAOS_SYSTEM_ERROR(errno);
×
365
    dError("failed to create status Info thread since %s", tstrerror(code));
×
366
    return code;
×
367
  }
368

369
  (void)taosThreadAttrDestroy(&thAttr);
1,715✔
370
  tmsgReportStartup("dnode-status-info", "initialized");
1,715✔
371
  return 0;
1,715✔
372
}
373

374
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
1,715✔
375
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
1,715!
376
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
1,715✔
377
    taosThreadClear(&pMgmt->statusThread);
1,715✔
378
  }
379
}
1,715✔
380

381
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
1,715✔
382
  if (taosCheckPthreadValid(pMgmt->configThread)) {
1,715!
383
    (void)taosThreadJoin(pMgmt->configThread, NULL);
1,715✔
384
    taosThreadClear(&pMgmt->configThread);
1,715✔
385
  }
386
}
1,715✔
387

388
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
1,715✔
389
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
1,715!
390
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
1,715✔
391
    taosThreadClear(&pMgmt->statusInfoThread);
1,715✔
392
  }
393
}
1,715✔
394

395
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
1,715✔
396
  int32_t      code = 0;
1,715✔
397
  TdThreadAttr thAttr;
398
  (void)taosThreadAttrInit(&thAttr);
1,715✔
399
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,715✔
400
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
1,715!
UNCOV
401
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
402
    dError("failed to create notify thread since %s", tstrerror(code));
×
UNCOV
403
    return code;
×
404
  }
405

406
  (void)taosThreadAttrDestroy(&thAttr);
1,715✔
407
  tmsgReportStartup("dnode-notify", "initialized");
1,715✔
408
  return 0;
1,715✔
409
}
410

411
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
1,715✔
412
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
1,715!
413
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
1,715!
414
      dError("failed to post notify sem");
×
415
    }
416

417
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
1,715✔
418
    taosThreadClear(&pMgmt->notifyThread);
1,715✔
419
  }
420
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
1,715!
UNCOV
421
    dError("failed to destroy notify sem");
×
422
  }
423
}
1,715✔
424

425
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
1,715✔
426
  int32_t      code = 0;
1,715✔
427
  TdThreadAttr thAttr;
428
  (void)taosThreadAttrInit(&thAttr);
1,715✔
429
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,715✔
430
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
1,715!
UNCOV
431
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
432
    dError("failed to create monitor thread since %s", tstrerror(code));
×
UNCOV
433
    return code;
×
434
  }
435

436
  (void)taosThreadAttrDestroy(&thAttr);
1,715✔
437
  tmsgReportStartup("dnode-monitor", "initialized");
1,715✔
438
  return 0;
1,715✔
439
}
440

441
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
1,715✔
442
  int32_t      code = 0;
1,715✔
443
  TdThreadAttr thAttr;
444
  (void)taosThreadAttrInit(&thAttr);
1,715✔
445
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,715✔
446
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
1,715!
UNCOV
447
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
448
    dError("failed to create audit thread since %s", tstrerror(code));
×
UNCOV
449
    return code;
×
450
  }
451

452
  (void)taosThreadAttrDestroy(&thAttr);
1,715✔
453
  tmsgReportStartup("dnode-audit", "initialized");
1,715✔
454
  return 0;
1,715✔
455
}
456

457
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
1,715✔
458
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
1,715!
459
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
1,715✔
460
    taosThreadClear(&pMgmt->monitorThread);
1,715✔
461
  }
462
}
1,715✔
463

464
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
1,715✔
465
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
1,715!
466
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
1,715✔
467
    taosThreadClear(&pMgmt->auditThread);
1,715✔
468
  }
469
}
1,715✔
470

471
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
1,715✔
472
  int32_t code = 0;
1,715✔
473
  if (!tsEnableCrashReport) {
1,715!
474
    return 0;
1,715✔
475
  }
476

477
  TdThreadAttr thAttr;
UNCOV
478
  (void)taosThreadAttrInit(&thAttr);
×
479
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
480
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
481
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
482
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
UNCOV
483
    return code;
×
484
  }
485

UNCOV
486
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
487
  tmsgReportStartup("dnode-crashReport", "initialized");
×
UNCOV
488
  return 0;
×
489
}
490

491
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
1,715✔
492
  if (!tsEnableCrashReport) {
1,715!
493
    return;
1,715✔
494
  }
495

UNCOV
496
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
UNCOV
497
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
UNCOV
498
    taosThreadClear(&pMgmt->crashReportThread);
×
499
  }
500
}
501

502
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
154,173✔
503
  SDnodeMgmt *pMgmt = pInfo->ahandle;
154,173✔
504
  int32_t     code = -1;
154,173✔
505
  STraceId   *trace = &pMsg->info.traceId;
154,173✔
506
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
154,173!
507

508
  switch (pMsg->msgType) {
154,173!
509
    case TDMT_DND_CONFIG_DNODE:
521✔
510
      code = dmProcessConfigReq(pMgmt, pMsg);
521✔
511
      break;
521✔
512
    case TDMT_MND_AUTH_RSP:
×
513
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
514
      break;
×
515
    case TDMT_MND_GRANT_RSP:
×
516
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
UNCOV
517
      break;
×
518
    case TDMT_DND_CREATE_MNODE:
37✔
519
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
37✔
520
      break;
37✔
521
    case TDMT_DND_DROP_MNODE:
5✔
522
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
5✔
523
      break;
5✔
524
    case TDMT_DND_CREATE_QNODE:
432✔
525
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
432✔
526
      break;
432✔
527
    case TDMT_DND_DROP_QNODE:
9✔
528
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
9✔
529
      break;
9✔
530
    case TDMT_DND_CREATE_SNODE:
18✔
531
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
18✔
532
      break;
18✔
533
    case TDMT_DND_DROP_SNODE:
4✔
534
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
4✔
535
      break;
4✔
536
    case TDMT_DND_ALTER_MNODE_TYPE:
419✔
537
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
419✔
538
      break;
419✔
539
    case TDMT_DND_SERVER_STATUS:
3✔
540
      code = dmProcessServerRunStatus(pMgmt, pMsg);
3✔
541
      break;
3✔
542
    case TDMT_DND_SYSTABLE_RETRIEVE:
496✔
543
      code = dmProcessRetrieve(pMgmt, pMsg);
496✔
544
      break;
496✔
545
    case TDMT_MND_GRANT:
1,416✔
546
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
1,416✔
547
      break;
1,416✔
548
    case TDMT_MND_GRANT_NOTIFY:
150,813✔
549
      code = dmProcessGrantNotify(NULL, pMsg);
150,813✔
550
      break;
150,813✔
UNCOV
551
    case TDMT_DND_CREATE_ENCRYPT_KEY:
×
UNCOV
552
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
×
UNCOV
553
      break;
×
UNCOV
554
    default:
×
UNCOV
555
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
UNCOV
556
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
UNCOV
557
      break;
×
558
  }
559

560
  if (IsReq(pMsg)) {
154,173!
561
    if (code != 0 && terrno != 0) code = terrno;
154,173✔
562
    SRpcMsg rsp = {
154,173✔
563
        .code = code,
564
        .pCont = pMsg->info.rsp,
154,173✔
565
        .contLen = pMsg->info.rspLen,
154,173✔
566
        .info = pMsg->info,
567
    };
568

569
    code = rpcSendResponse(&rsp);
154,173✔
570
    if (code != 0) {
154,173!
UNCOV
571
      dError("failed to send response since %s", tstrerror(code));
×
572
    }
573
  }
574

575
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
154,173✔
576
  rpcFreeCont(pMsg->pCont);
154,173✔
577
  taosFreeQitem(pMsg);
154,173✔
578
}
154,173✔
579

580
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
1,715✔
581
  int32_t          code = 0;
1,715✔
582
  SSingleWorkerCfg cfg = {
1,715✔
583
      .min = 1,
584
      .max = 1,
585
      .name = "dnode-mgmt",
586
      .fp = (FItem)dmProcessMgmtQueue,
587
      .param = pMgmt,
588
  };
589
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
1,715!
UNCOV
590
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
UNCOV
591
    return code;
×
592
  }
593

594
  dDebug("dnode workers are initialized");
1,715✔
595
  return 0;
1,715✔
596
}
597

598
void dmStopWorker(SDnodeMgmt *pMgmt) {
1,715✔
599
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
1,715✔
600
  dDebug("dnode workers are closed");
1,715✔
601
}
1,715✔
602

603
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
154,154✔
604
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
154,154✔
605
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
154,154✔
606
  return taosWriteQitem(pWorker->queue, pMsg);
154,154✔
607
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc