• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20

UNCOV
21
static void *dmStatusThreadFp(void *param) {
×
UNCOV
22
  SDnodeMgmt *pMgmt = param;
×
UNCOV
23
  int64_t     lastTime = taosGetTimestampMs();
×
UNCOV
24
  setThreadName("dnode-status");
×
25

UNCOV
26
  int32_t upTimeCount = 0;
×
UNCOV
27
  int64_t upTime = 0;
×
28

UNCOV
29
  while (1) {
×
UNCOV
30
    taosMsleep(200);
×
UNCOV
31
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
×
32

UNCOV
33
    int64_t curTime = taosGetTimestampMs();
×
UNCOV
34
    if (curTime < lastTime) lastTime = curTime;
×
UNCOV
35
    float interval = (curTime - lastTime) / 1000.0f;
×
UNCOV
36
    if (interval >= tsStatusInterval) {
×
UNCOV
37
      dmSendStatusReq(pMgmt);
×
UNCOV
38
      lastTime = curTime;
×
39

UNCOV
40
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
×
UNCOV
41
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
×
UNCOV
42
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
×
43
      }
44
    }
45
  }
46

UNCOV
47
  return NULL;
×
48
}
49

UNCOV
50
static void *dmConfigThreadFp(void *param) {
×
UNCOV
51
  SDnodeMgmt *pMgmt = param;
×
UNCOV
52
  int64_t     lastTime = taosGetTimestampMs();
×
UNCOV
53
  setThreadName("dnode-config");
×
UNCOV
54
  while (1) {
×
UNCOV
55
    taosMsleep(200);
×
UNCOV
56
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
×
57

UNCOV
58
    int64_t curTime = taosGetTimestampMs();
×
UNCOV
59
    if (curTime < lastTime) lastTime = curTime;
×
UNCOV
60
    float interval = (curTime - lastTime) / 1000.0f;
×
UNCOV
61
    if (interval >= tsStatusInterval) {
×
UNCOV
62
      dmSendConfigReq(pMgmt);
×
UNCOV
63
      lastTime = curTime;
×
64
    }
65
  }
UNCOV
66
  return NULL;
×
67
}
68

UNCOV
69
static void *dmStatusInfoThreadFp(void *param) {
×
UNCOV
70
  SDnodeMgmt *pMgmt = param;
×
UNCOV
71
  int64_t     lastTime = taosGetTimestampMs();
×
UNCOV
72
  setThreadName("dnode-status-info");
×
73

UNCOV
74
  int32_t upTimeCount = 0;
×
UNCOV
75
  int64_t upTime = 0;
×
76

UNCOV
77
  while (1) {
×
UNCOV
78
    taosMsleep(200);
×
UNCOV
79
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
×
80

UNCOV
81
    int64_t curTime = taosGetTimestampMs();
×
UNCOV
82
    if (curTime < lastTime) lastTime = curTime;
×
UNCOV
83
    float interval = (curTime - lastTime) / 1000.0f;
×
UNCOV
84
    if (interval >= tsStatusInterval) {
×
UNCOV
85
      dmUpdateStatusInfo(pMgmt);
×
UNCOV
86
      lastTime = curTime;
×
87

UNCOV
88
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
×
UNCOV
89
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
×
UNCOV
90
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
×
91
      }
92
    }
93
  }
94

UNCOV
95
  return NULL;
×
96
}
97

98
SDmNotifyHandle dmNotifyHdl = {.state = 0};
99
#define TIMESERIES_STASH_NUM 5
UNCOV
100
static void *dmNotifyThreadFp(void *param) {
×
UNCOV
101
  SDnodeMgmt *pMgmt = param;
×
UNCOV
102
  int64_t     lastTime = taosGetTimestampMs();
×
UNCOV
103
  setThreadName("dnode-notify");
×
104

UNCOV
105
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
×
106
    return NULL;
×
107
  }
108

109
  // calculate approximate timeSeries per second
110
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
111
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
UNCOV
112
  int64_t  approximateTimeSeries = 0;
×
UNCOV
113
  uint64_t nTotalNotify = 0;
×
UNCOV
114
  int32_t  head, tail = 0;
×
115

UNCOV
116
  bool       wait = true;
×
UNCOV
117
  int32_t    nDnode = 0;
×
UNCOV
118
  int64_t    lastNotify = 0;
×
UNCOV
119
  int64_t    lastFetchDnode = 0;
×
UNCOV
120
  SNotifyReq req = {0};
×
UNCOV
121
  while (1) {
×
UNCOV
122
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
×
UNCOV
123
    if (wait) tsem_wait(&dmNotifyHdl.sem);
×
UNCOV
124
    atomic_store_8(&dmNotifyHdl.state, 1);
×
125

UNCOV
126
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
×
UNCOV
127
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
×
UNCOV
128
      goto _skip;
×
129
    }
130
    int64_t current = taosGetTimestampMs();
×
131
    if (current - lastFetchDnode > 1000) {
×
132
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
133
      if (nDnode < 1) nDnode = 1;
×
134
      lastFetchDnode = current;
×
135
    }
136
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
137
      req.dnodeId = pMgmt->pData->dnodeId;
×
138
      req.clusterId = pMgmt->pData->clusterId;
×
139
    }
140

141
    if (current - lastNotify < 10) {
×
142
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
143
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
144
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
145
        taosMsleep(10);
×
146
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
147
        taosMsleep(5);
×
148
      } else {
149
        taosMsleep(2);
×
150
      }
151
    }
152

153
    SMonVloadInfo vinfo = {0};
×
154
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
155
    req.pVloads = vinfo.pVloads;
×
156
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
157
    int64_t nTimeSeries = 0;
×
158
    for (int32_t i = 0; i < nVgroup; ++i) {
×
159
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
160
      nTimeSeries += vload->nTimeSeries;
×
161
    }
162
    notifyTimeSeries[tail] = nTimeSeries;
×
163
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
164
    ++nTotalNotify;
×
165

166
    approximateTimeSeries = 0;
×
167
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
168
      head = tail - TIMESERIES_STASH_NUM + 1;
×
169
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
170
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
171
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
172
      if (tsDiff > 0) {
×
173
        if (timeDiff > 0 && timeDiff < 1e9) {
×
174
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
175
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
176
            dmSendNotifyReq(pMgmt, &req);
×
177
          }
178
        } else {
179
          dmSendNotifyReq(pMgmt, &req);
×
180
        }
181
      }
182
    } else {
183
      dmSendNotifyReq(pMgmt, &req);
×
184
    }
185
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
186

187
    tFreeSNotifyReq(&req);
×
188
    lastNotify = taosGetTimestampMs();
×
UNCOV
189
  _skip:
×
UNCOV
190
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
×
UNCOV
191
      wait = true;
×
UNCOV
192
      continue;
×
193
    }
UNCOV
194
    wait = false;
×
195
  }
196

UNCOV
197
  return NULL;
×
198
}
199

UNCOV
200
static void *dmMonitorThreadFp(void *param) {
×
UNCOV
201
  SDnodeMgmt *pMgmt = param;
×
UNCOV
202
  int64_t     lastTime = taosGetTimestampMs();
×
UNCOV
203
  int64_t     lastTimeForBasic = taosGetTimestampMs();
×
UNCOV
204
  setThreadName("dnode-monitor");
×
205

206
  static int32_t TRIM_FREQ = 20;
UNCOV
207
  int32_t        trimCount = 0;
×
208

UNCOV
209
  while (1) {
×
UNCOV
210
    taosMsleep(200);
×
UNCOV
211
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
×
212

UNCOV
213
    int64_t curTime = taosGetTimestampMs();
×
214

UNCOV
215
    if (curTime < lastTime) lastTime = curTime;
×
UNCOV
216
    float interval = (curTime - lastTime) / 1000.0f;
×
UNCOV
217
    if (interval >= tsMonitorInterval) {
×
UNCOV
218
      (*pMgmt->sendMonitorReportFp)();
×
UNCOV
219
      (*pMgmt->monitorCleanExpiredSamplesFp)();
×
UNCOV
220
      lastTime = curTime;
×
221

UNCOV
222
      trimCount = (trimCount + 1) % TRIM_FREQ;
×
UNCOV
223
      if (trimCount == 0) {
×
UNCOV
224
        taosMemoryTrim(0, NULL);
×
225
      }
226
    }
UNCOV
227
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
×
UNCOV
228
      taosMemoryTrim(0, NULL);
×
229
    }
230
  }
231

UNCOV
232
  return NULL;
×
233
}
234

UNCOV
235
static void *dmAuditThreadFp(void *param) {
×
UNCOV
236
  SDnodeMgmt *pMgmt = param;
×
UNCOV
237
  int64_t     lastTime = taosGetTimestampMs();
×
UNCOV
238
  setThreadName("dnode-audit");
×
239

UNCOV
240
  while (1) {
×
UNCOV
241
    taosMsleep(100);
×
UNCOV
242
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
×
243

UNCOV
244
    int64_t curTime = taosGetTimestampMs();
×
UNCOV
245
    if (curTime < lastTime) lastTime = curTime;
×
UNCOV
246
    float interval = curTime - lastTime;
×
UNCOV
247
    if (interval >= tsAuditInterval) {
×
UNCOV
248
      (*pMgmt->sendAuditRecordsFp)();
×
UNCOV
249
      lastTime = curTime;
×
250
    }
251
  }
252

UNCOV
253
  return NULL;
×
254
}
255

256
static void *dmCrashReportThreadFp(void *param) {
×
257
  int32_t     code = 0;
×
258
  SDnodeMgmt *pMgmt = param;
×
259
  int64_t     lastTime = taosGetTimestampMs();
×
260
  setThreadName("dnode-crashReport");
×
261
  char filepath[PATH_MAX] = {0};
×
262
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
263
  char     *pMsg = NULL;
×
264
  int64_t   msgLen = 0;
×
265
  TdFilePtr pFile = NULL;
×
266
  bool      truncateFile = false;
×
267
  int32_t   sleepTime = 200;
×
268
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
269
  int32_t   loopTimes = reportPeriodNum;
×
270

271
  STelemAddrMgmt mgt = {0};
×
272
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
273
  if (code != 0) {
×
274
    dError("failed to init telemetry since %s", tstrerror(code));
×
275
    return NULL;
×
276
  }
277
  code = initCrashLogWriter();
×
278
  if (code != 0) {
×
279
    dError("failed to init crash log writer since %s", tstrerror(code));
×
280
    return NULL;
×
281
  }
282

283
  while (1) {
284
    checkAndPrepareCrashInfo();
×
285
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
286
      break;
×
287
    }
288
    if (loopTimes++ < reportPeriodNum) {
×
289
      taosMsleep(sleepTime);
×
290
      if(loopTimes < 0) loopTimes = reportPeriodNum;
×
291
      continue;
×
292
    }
293
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
294
    if (pMsg && msgLen > 0) {
×
295
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
296
        dError("failed to send crash report");
×
297
        if (pFile) {
×
298
          taosReleaseCrashLogFile(pFile, false);
×
299
          pFile = NULL;
×
300

301
          taosMsleep(sleepTime);
×
302
          loopTimes = 0;
×
303
          continue;
×
304
        }
305
      } else {
306
        dInfo("succeed to send crash report");
×
307
        truncateFile = true;
×
308
      }
309
    } else {
310
      dDebug("no crash info");
×
311
    }
312

313
    taosMemoryFree(pMsg);
×
314

315
    if (pMsg && msgLen > 0) {
×
316
      pMsg = NULL;
×
317
      continue;
×
318
    }
319

320
    if (pFile) {
×
321
      taosReleaseCrashLogFile(pFile, truncateFile);
×
322
      pFile = NULL;
×
323
      truncateFile = false;
×
324
    }
325

326
    taosMsleep(sleepTime);
×
327
    loopTimes = 0;
×
328
  }
329
  taosTelemetryDestroy(&mgt);
×
330

331
  return NULL;
×
332
}
333

UNCOV
334
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
×
UNCOV
335
  int32_t      code = 0;
×
336
  TdThreadAttr thAttr;
UNCOV
337
  (void)taosThreadAttrInit(&thAttr);
×
UNCOV
338
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
339
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
×
340
    code = TAOS_SYSTEM_ERROR(errno);
×
341
    dError("failed to create status thread since %s", tstrerror(code));
×
342
    return code;
×
343
  }
344

UNCOV
345
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
346
  tmsgReportStartup("dnode-status", "initialized");
×
UNCOV
347
  return 0;
×
348
}
349

UNCOV
350
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
×
UNCOV
351
  int32_t      code = 0;
×
352
  TdThreadAttr thAttr;
UNCOV
353
  (void)taosThreadAttrInit(&thAttr);
×
UNCOV
354
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
355
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
×
356
    code = TAOS_SYSTEM_ERROR(errno);
×
357
    dError("failed to create config thread since %s", tstrerror(code));
×
358
    return code;
×
359
  }
360

UNCOV
361
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
362
  tmsgReportStartup("config-status", "initialized");
×
UNCOV
363
  return 0;
×
364
}
365

UNCOV
366
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
×
UNCOV
367
  int32_t      code = 0;
×
368
  TdThreadAttr thAttr;
UNCOV
369
  (void)taosThreadAttrInit(&thAttr);
×
UNCOV
370
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
371
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
×
372
    code = TAOS_SYSTEM_ERROR(errno);
×
373
    dError("failed to create status Info thread since %s", tstrerror(code));
×
374
    return code;
×
375
  }
376

UNCOV
377
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
378
  tmsgReportStartup("dnode-status-info", "initialized");
×
UNCOV
379
  return 0;
×
380
}
381

UNCOV
382
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
×
UNCOV
383
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
×
UNCOV
384
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
×
UNCOV
385
    taosThreadClear(&pMgmt->statusThread);
×
386
  }
UNCOV
387
}
×
388

UNCOV
389
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
×
UNCOV
390
  if (taosCheckPthreadValid(pMgmt->configThread)) {
×
UNCOV
391
    (void)taosThreadJoin(pMgmt->configThread, NULL);
×
UNCOV
392
    taosThreadClear(&pMgmt->configThread);
×
393
  }
UNCOV
394
}
×
395

UNCOV
396
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
×
UNCOV
397
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
×
UNCOV
398
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
×
UNCOV
399
    taosThreadClear(&pMgmt->statusInfoThread);
×
400
  }
UNCOV
401
}
×
402

UNCOV
403
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
×
UNCOV
404
  int32_t      code = 0;
×
405
  TdThreadAttr thAttr;
UNCOV
406
  (void)taosThreadAttrInit(&thAttr);
×
UNCOV
407
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
408
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
×
409
    code = TAOS_SYSTEM_ERROR(errno);
×
410
    dError("failed to create notify thread since %s", tstrerror(code));
×
411
    return code;
×
412
  }
413

UNCOV
414
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
415
  tmsgReportStartup("dnode-notify", "initialized");
×
UNCOV
416
  return 0;
×
417
}
418

UNCOV
419
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
×
UNCOV
420
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
×
UNCOV
421
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
×
422
      dError("failed to post notify sem");
×
423
    }
424

UNCOV
425
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
×
UNCOV
426
    taosThreadClear(&pMgmt->notifyThread);
×
427
  }
UNCOV
428
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
×
429
    dError("failed to destroy notify sem");
×
430
  }
UNCOV
431
}
×
432

UNCOV
433
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
×
UNCOV
434
  int32_t      code = 0;
×
435
  TdThreadAttr thAttr;
UNCOV
436
  (void)taosThreadAttrInit(&thAttr);
×
UNCOV
437
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
438
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
×
439
    code = TAOS_SYSTEM_ERROR(errno);
×
440
    dError("failed to create monitor thread since %s", tstrerror(code));
×
441
    return code;
×
442
  }
443

UNCOV
444
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
445
  tmsgReportStartup("dnode-monitor", "initialized");
×
UNCOV
446
  return 0;
×
447
}
448

UNCOV
449
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
×
UNCOV
450
  int32_t      code = 0;
×
451
  TdThreadAttr thAttr;
UNCOV
452
  (void)taosThreadAttrInit(&thAttr);
×
UNCOV
453
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
454
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
×
455
    code = TAOS_SYSTEM_ERROR(errno);
×
456
    dError("failed to create audit thread since %s", tstrerror(code));
×
457
    return code;
×
458
  }
459

UNCOV
460
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
461
  tmsgReportStartup("dnode-audit", "initialized");
×
UNCOV
462
  return 0;
×
463
}
464

UNCOV
465
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
×
UNCOV
466
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
×
UNCOV
467
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
×
UNCOV
468
    taosThreadClear(&pMgmt->monitorThread);
×
469
  }
UNCOV
470
}
×
471

UNCOV
472
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
×
UNCOV
473
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
×
UNCOV
474
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
×
UNCOV
475
    taosThreadClear(&pMgmt->auditThread);
×
476
  }
UNCOV
477
}
×
478

UNCOV
479
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
×
UNCOV
480
  int32_t code = 0;
×
UNCOV
481
  if (!tsEnableCrashReport) {
×
UNCOV
482
    return 0;
×
483
  }
484

485
  TdThreadAttr thAttr;
486
  (void)taosThreadAttrInit(&thAttr);
×
487
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
488
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
489
    code = TAOS_SYSTEM_ERROR(errno);
×
490
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
491
    return code;
×
492
  }
493

494
  (void)taosThreadAttrDestroy(&thAttr);
×
495
  tmsgReportStartup("dnode-crashReport", "initialized");
×
496
  return 0;
×
497
}
498

UNCOV
499
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
×
UNCOV
500
  if (!tsEnableCrashReport) {
×
UNCOV
501
    return;
×
502
  }
503

504
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
505
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
506
    taosThreadClear(&pMgmt->crashReportThread);
×
507
  }
508
}
509

UNCOV
510
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
×
UNCOV
511
  SDnodeMgmt *pMgmt = pInfo->ahandle;
×
UNCOV
512
  int32_t     code = -1;
×
UNCOV
513
  STraceId   *trace = &pMsg->info.traceId;
×
UNCOV
514
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
515

UNCOV
516
  switch (pMsg->msgType) {
×
UNCOV
517
    case TDMT_DND_CONFIG_DNODE:
×
UNCOV
518
      code = dmProcessConfigReq(pMgmt, pMsg);
×
UNCOV
519
      break;
×
520
    case TDMT_MND_AUTH_RSP:
×
521
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
522
      break;
×
523
    case TDMT_MND_GRANT_RSP:
×
524
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
525
      break;
×
UNCOV
526
    case TDMT_DND_CREATE_MNODE:
×
UNCOV
527
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
×
UNCOV
528
      break;
×
UNCOV
529
    case TDMT_DND_DROP_MNODE:
×
UNCOV
530
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
×
UNCOV
531
      break;
×
UNCOV
532
    case TDMT_DND_CREATE_QNODE:
×
UNCOV
533
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
×
UNCOV
534
      break;
×
UNCOV
535
    case TDMT_DND_DROP_QNODE:
×
UNCOV
536
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
×
UNCOV
537
      break;
×
UNCOV
538
    case TDMT_DND_CREATE_SNODE:
×
UNCOV
539
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
×
UNCOV
540
      break;
×
UNCOV
541
    case TDMT_DND_DROP_SNODE:
×
UNCOV
542
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
×
UNCOV
543
      break;
×
UNCOV
544
    case TDMT_DND_ALTER_MNODE_TYPE:
×
UNCOV
545
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
×
UNCOV
546
      break;
×
UNCOV
547
    case TDMT_DND_SERVER_STATUS:
×
UNCOV
548
      code = dmProcessServerRunStatus(pMgmt, pMsg);
×
UNCOV
549
      break;
×
UNCOV
550
    case TDMT_DND_SYSTABLE_RETRIEVE:
×
UNCOV
551
      code = dmProcessRetrieve(pMgmt, pMsg);
×
UNCOV
552
      break;
×
UNCOV
553
    case TDMT_MND_GRANT:
×
UNCOV
554
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
×
UNCOV
555
      break;
×
UNCOV
556
    case TDMT_MND_GRANT_NOTIFY:
×
UNCOV
557
      code = dmProcessGrantNotify(NULL, pMsg);
×
UNCOV
558
      break;
×
559
    case TDMT_DND_CREATE_ENCRYPT_KEY:
×
560
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
×
561
      break;
×
562
    default:
×
563
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
564
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
565
      break;
×
566
  }
567

UNCOV
568
  if (IsReq(pMsg)) {
×
UNCOV
569
    if (code != 0 && terrno != 0) code = terrno;
×
UNCOV
570
    SRpcMsg rsp = {
×
571
        .code = code,
UNCOV
572
        .pCont = pMsg->info.rsp,
×
UNCOV
573
        .contLen = pMsg->info.rspLen,
×
574
        .info = pMsg->info,
575
    };
576

UNCOV
577
    code = rpcSendResponse(&rsp);
×
UNCOV
578
    if (code != 0) {
×
579
      dError("failed to send response since %s", tstrerror(code));
×
580
    }
581
  }
582

UNCOV
583
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
×
UNCOV
584
  rpcFreeCont(pMsg->pCont);
×
UNCOV
585
  taosFreeQitem(pMsg);
×
UNCOV
586
}
×
587

UNCOV
588
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
×
UNCOV
589
  int32_t          code = 0;
×
UNCOV
590
  SSingleWorkerCfg cfg = {
×
591
      .min = 1,
592
      .max = 1,
593
      .name = "dnode-mgmt",
594
      .fp = (FItem)dmProcessMgmtQueue,
595
      .param = pMgmt,
596
  };
UNCOV
597
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
×
598
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
599
    return code;
×
600
  }
601

UNCOV
602
  dDebug("dnode workers are initialized");
×
UNCOV
603
  return 0;
×
604
}
605

UNCOV
606
void dmStopWorker(SDnodeMgmt *pMgmt) {
×
UNCOV
607
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
×
UNCOV
608
  dDebug("dnode workers are closed");
×
UNCOV
609
}
×
610

UNCOV
611
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
612
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
×
UNCOV
613
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
×
UNCOV
614
  return taosWriteQitem(pWorker->queue, pMsg);
×
615
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc