• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.92
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20

21
static void *dmStatusThreadFp(void *param) {
1,563✔
22
  SDnodeMgmt *pMgmt = param;
1,563✔
23
  int64_t     lastTime = taosGetTimestampMs();
1,563✔
24
  setThreadName("dnode-status");
1,563✔
25

26
  int32_t upTimeCount = 0;
1,563✔
27
  int64_t upTime = 0;
1,563✔
28

29
  while (1) {
220,508✔
30
    taosMsleep(200);
222,071✔
31
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
222,071!
32

33
    int64_t curTime = taosGetTimestampMs();
220,508✔
34
    if (curTime < lastTime) lastTime = curTime;
220,508!
35
    float interval = (curTime - lastTime) / 1000.0f;
220,508✔
36
    if (interval >= tsStatusInterval) {
220,508✔
37
      dmSendStatusReq(pMgmt);
43,795✔
38
      lastTime = curTime;
43,795✔
39

40
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
43,795✔
41
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
337✔
42
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
337✔
43
      }
44
    }
45
  }
46

47
  return NULL;
1,563✔
48
}
49

50
static void *dmStatusInfoThreadFp(void *param) {
1,563✔
51
  SDnodeMgmt *pMgmt = param;
1,563✔
52
  int64_t     lastTime = taosGetTimestampMs();
1,563✔
53
  setThreadName("dnode-status-info");
1,563✔
54

55
  int32_t upTimeCount = 0;
1,563✔
56
  int64_t upTime = 0;
1,563✔
57

58
  while (1) {
229,236✔
59
    taosMsleep(200);
230,799✔
60
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
230,799!
61

62
    int64_t curTime = taosGetTimestampMs();
229,236✔
63
    if (curTime < lastTime) lastTime = curTime;
229,236!
64
    float interval = (curTime - lastTime) / 1000.0f;
229,236✔
65
    if (interval >= tsStatusInterval) {
229,236✔
66
      dmUpdateStatusInfo(pMgmt);
45,203✔
67
      lastTime = curTime;
45,203✔
68

69
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
45,203✔
70
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
343✔
71
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
343✔
72
      }
73
    }
74
  }
75

76
  return NULL;
1,563✔
77
}
78

79
SDmNotifyHandle dmNotifyHdl = {.state = 0};
80
#define TIMESERIES_STASH_NUM 5
81
static void *dmNotifyThreadFp(void *param) {
1,563✔
82
  SDnodeMgmt *pMgmt = param;
1,563✔
83
  int64_t     lastTime = taosGetTimestampMs();
1,563✔
84
  setThreadName("dnode-notify");
1,563✔
85

86
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
1,563!
87
    return NULL;
×
88
  }
89

90
  // calculate approximate timeSeries per second
91
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
92
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
93
  int64_t  approximateTimeSeries = 0;
1,563✔
94
  uint64_t nTotalNotify = 0;
1,563✔
95
  int32_t  head, tail = 0;
1,563✔
96

97
  bool       wait = true;
1,563✔
98
  int32_t    nDnode = 0;
1,563✔
99
  int64_t    lastNotify = 0;
1,563✔
100
  int64_t    lastFetchDnode = 0;
1,563✔
101
  SNotifyReq req = {0};
1,563✔
102
  while (1) {
16,680✔
103
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
18,243!
104
    if (wait) tsem_wait(&dmNotifyHdl.sem);
16,680✔
105
    atomic_store_8(&dmNotifyHdl.state, 1);
16,680✔
106

107
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
16,680✔
108
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
16,680!
109
      goto _skip;
16,680✔
110
    }
111
    int64_t current = taosGetTimestampMs();
×
112
    if (current - lastFetchDnode > 1000) {
×
113
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
114
      if (nDnode < 1) nDnode = 1;
×
115
      lastFetchDnode = current;
×
116
    }
117
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
118
      req.dnodeId = pMgmt->pData->dnodeId;
×
119
      req.clusterId = pMgmt->pData->clusterId;
×
120
    }
121

122
    if (current - lastNotify < 10) {
×
123
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
124
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
125
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
126
        taosMsleep(10);
×
127
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
128
        taosMsleep(5);
×
129
      } else {
130
        taosMsleep(2);
×
131
      }
132
    }
133

134
    SMonVloadInfo vinfo = {0};
×
135
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
136
    req.pVloads = vinfo.pVloads;
×
137
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
138
    int64_t nTimeSeries = 0;
×
139
    for (int32_t i = 0; i < nVgroup; ++i) {
×
140
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
141
      nTimeSeries += vload->nTimeSeries;
×
142
    }
143
    notifyTimeSeries[tail] = nTimeSeries;
×
144
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
145
    ++nTotalNotify;
×
146

147
    approximateTimeSeries = 0;
×
148
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
149
      head = tail - TIMESERIES_STASH_NUM + 1;
×
150
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
151
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
152
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
153
      if (tsDiff > 0) {
×
154
        if (timeDiff > 0 && timeDiff < 1e9) {
×
155
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
156
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
157
            dmSendNotifyReq(pMgmt, &req);
×
158
          }
159
        } else {
160
          dmSendNotifyReq(pMgmt, &req);
×
161
        }
162
      }
163
    } else {
164
      dmSendNotifyReq(pMgmt, &req);
×
165
    }
166
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
167

168
    tFreeSNotifyReq(&req);
×
169
    lastNotify = taosGetTimestampMs();
×
170
  _skip:
16,680✔
171
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
16,680✔
172
      wait = true;
16,673✔
173
      continue;
16,673✔
174
    }
175
    wait = false;
7✔
176
  }
177

178
  return NULL;
1,563✔
179
}
180

181
static void *dmMonitorThreadFp(void *param) {
1,563✔
182
  SDnodeMgmt *pMgmt = param;
1,563✔
183
  int64_t     lastTime = taosGetTimestampMs();
1,563✔
184
  int64_t     lastTimeForBasic = taosGetTimestampMs();
1,563✔
185
  setThreadName("dnode-monitor");
1,563✔
186

187
  static int32_t TRIM_FREQ = 20;
188
  int32_t        trimCount = 0;
1,563✔
189

190
  while (1) {
229,245✔
191
    taosMsleep(200);
230,808✔
192
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
230,808!
193

194
    int64_t curTime = taosGetTimestampMs();
229,245✔
195

196
    if (curTime < lastTime) lastTime = curTime;
229,245!
197
    float interval = (curTime - lastTime) / 1000.0f;
229,245✔
198
    if (interval >= tsMonitorInterval) {
229,245✔
199
      (*pMgmt->sendMonitorReportFp)();
1,006✔
200
      (*pMgmt->monitorCleanExpiredSamplesFp)();
1,006✔
201
      lastTime = curTime;
1,006✔
202

203
      trimCount = (trimCount + 1) % TRIM_FREQ;
1,006✔
204
      if (trimCount == 0) {
1,006!
UNCOV
205
        taosMemoryTrim(0);
×
206
      }
207
    }
208
  }
209

210
  return NULL;
1,563✔
211
}
212

213
static void *dmAuditThreadFp(void *param) {
1,563✔
214
  SDnodeMgmt *pMgmt = param;
1,563✔
215
  int64_t     lastTime = taosGetTimestampMs();
1,563✔
216
  setThreadName("dnode-audit");
1,563✔
217

218
  while (1) {
459,032✔
219
    taosMsleep(100);
460,595✔
220
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
460,595!
221

222
    int64_t curTime = taosGetTimestampMs();
459,032✔
223
    if (curTime < lastTime) lastTime = curTime;
459,032!
224
    float interval = curTime - lastTime;
459,032✔
225
    if (interval >= tsAuditInterval) {
459,032✔
226
      (*pMgmt->sendAuditRecordsFp)();
8,421✔
227
      lastTime = curTime;
8,421✔
228
    }
229
  }
230

231
  return NULL;
1,563✔
232
}
233

234
static void *dmCrashReportThreadFp(void *param) {
×
235
  SDnodeMgmt *pMgmt = param;
×
236
  int64_t     lastTime = taosGetTimestampMs();
×
237
  setThreadName("dnode-crashReport");
×
238
  char filepath[PATH_MAX] = {0};
×
239
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
240
  char     *pMsg = NULL;
×
241
  int64_t   msgLen = 0;
×
242
  TdFilePtr pFile = NULL;
×
243
  bool      truncateFile = false;
×
244
  int32_t   sleepTime = 200;
×
245
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
246
  ;
247
  int32_t loopTimes = reportPeriodNum;
×
248

249
  while (1) {
250
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
×
251
    if (loopTimes++ < reportPeriodNum) {
×
252
      taosMsleep(sleepTime);
×
253
      continue;
×
254
    }
255

256
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
257
    if (pMsg && msgLen > 0) {
×
258
      if (taosSendHttpReport(tsTelemServer, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
259
        dError("failed to send crash report");
×
260
        if (pFile) {
×
261
          taosReleaseCrashLogFile(pFile, false);
×
262
          pFile = NULL;
×
263

264
          taosMsleep(sleepTime);
×
265
          loopTimes = 0;
×
266
          continue;
×
267
        }
268
      } else {
269
        dInfo("succeed to send crash report");
×
270
        truncateFile = true;
×
271
      }
272
    } else {
273
      dDebug("no crash info");
×
274
    }
275

276
    taosMemoryFree(pMsg);
×
277

278
    if (pMsg && msgLen > 0) {
×
279
      pMsg = NULL;
×
280
      continue;
×
281
    }
282

283
    if (pFile) {
×
284
      taosReleaseCrashLogFile(pFile, truncateFile);
×
285
      pFile = NULL;
×
286
      truncateFile = false;
×
287
    }
288

289
    taosMsleep(sleepTime);
×
290
    loopTimes = 0;
×
291
  }
292

293
  return NULL;
×
294
}
295

296
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
1,563✔
297
  int32_t      code = 0;
1,563✔
298
  TdThreadAttr thAttr;
299
  (void)taosThreadAttrInit(&thAttr);
1,563✔
300
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,563✔
301
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
1,563!
302
    code = TAOS_SYSTEM_ERROR(errno);
×
303
    dError("failed to create status thread since %s", tstrerror(code));
×
304
    return code;
×
305
  }
306

307
  (void)taosThreadAttrDestroy(&thAttr);
1,563✔
308
  tmsgReportStartup("dnode-status", "initialized");
1,563✔
309
  return 0;
1,563✔
310
}
311

312
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
1,563✔
313
  int32_t      code = 0;
1,563✔
314
  TdThreadAttr thAttr;
315
  (void)taosThreadAttrInit(&thAttr);
1,563✔
316
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,563✔
317
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
1,563!
318
    code = TAOS_SYSTEM_ERROR(errno);
×
319
    dError("failed to create status Info thread since %s", tstrerror(code));
×
320
    return code;
×
321
  }
322

323
  (void)taosThreadAttrDestroy(&thAttr);
1,563✔
324
  tmsgReportStartup("dnode-status-info", "initialized");
1,563✔
325
  return 0;
1,563✔
326
}
327

328
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
1,563✔
329
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
1,563!
330
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
1,563✔
331
    taosThreadClear(&pMgmt->statusThread);
1,563✔
332
  }
333
}
1,563✔
334

335
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
1,563✔
336
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
1,563!
337
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
1,563✔
338
    taosThreadClear(&pMgmt->statusInfoThread);
1,563✔
339
  }
340
}
1,563✔
341

342
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
1,563✔
343
  int32_t      code = 0;
1,563✔
344
  TdThreadAttr thAttr;
345
  (void)taosThreadAttrInit(&thAttr);
1,563✔
346
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,563✔
347
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
1,563!
348
    code = TAOS_SYSTEM_ERROR(errno);
×
349
    dError("failed to create notify thread since %s", tstrerror(code));
×
350
    return code;
×
351
  }
352

353
  (void)taosThreadAttrDestroy(&thAttr);
1,563✔
354
  tmsgReportStartup("dnode-notify", "initialized");
1,563✔
355
  return 0;
1,563✔
356
}
357

358
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
1,563✔
359
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
1,563!
360
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
1,563!
361
      dError("failed to post notify sem");
×
362
    }
363

364
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
1,563✔
365
    taosThreadClear(&pMgmt->notifyThread);
1,563✔
366
  }
367
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
1,563!
368
    dError("failed to destroy notify sem");
×
369
  }
370
}
1,563✔
371

372
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
1,563✔
373
  int32_t      code = 0;
1,563✔
374
  TdThreadAttr thAttr;
375
  (void)taosThreadAttrInit(&thAttr);
1,563✔
376
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,563✔
377
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
1,563!
378
    code = TAOS_SYSTEM_ERROR(errno);
×
379
    dError("failed to create monitor thread since %s", tstrerror(code));
×
380
    return code;
×
381
  }
382

383
  (void)taosThreadAttrDestroy(&thAttr);
1,563✔
384
  tmsgReportStartup("dnode-monitor", "initialized");
1,563✔
385
  return 0;
1,563✔
386
}
387

388
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
1,563✔
389
  int32_t      code = 0;
1,563✔
390
  TdThreadAttr thAttr;
391
  (void)taosThreadAttrInit(&thAttr);
1,563✔
392
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,563✔
393
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
1,563!
394
    code = TAOS_SYSTEM_ERROR(errno);
×
395
    dError("failed to create audit thread since %s", tstrerror(code));
×
396
    return code;
×
397
  }
398

399
  (void)taosThreadAttrDestroy(&thAttr);
1,563✔
400
  tmsgReportStartup("dnode-audit", "initialized");
1,563✔
401
  return 0;
1,563✔
402
}
403

404
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
1,563✔
405
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
1,563!
406
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
1,563✔
407
    taosThreadClear(&pMgmt->monitorThread);
1,563✔
408
  }
409
}
1,563✔
410

411
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
1,563✔
412
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
1,563!
413
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
1,563✔
414
    taosThreadClear(&pMgmt->auditThread);
1,563✔
415
  }
416
}
1,563✔
417

418
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
1,563✔
419
  int32_t code = 0;
1,563✔
420
  if (!tsEnableCrashReport) {
1,563!
421
    return 0;
1,563✔
422
  }
423

424
  TdThreadAttr thAttr;
425
  (void)taosThreadAttrInit(&thAttr);
×
426
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
427
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
428
    code = TAOS_SYSTEM_ERROR(errno);
×
429
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
430
    return code;
×
431
  }
432

433
  (void)taosThreadAttrDestroy(&thAttr);
×
434
  tmsgReportStartup("dnode-crashReport", "initialized");
×
435
  return 0;
×
436
}
437

438
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
1,563✔
439
  if (!tsEnableCrashReport) {
1,563!
440
    return;
1,563✔
441
  }
442

443
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
444
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
445
    taosThreadClear(&pMgmt->crashReportThread);
×
446
  }
447
}
448

449
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
58,395✔
450
  SDnodeMgmt *pMgmt = pInfo->ahandle;
58,395✔
451
  int32_t     code = -1;
58,395✔
452
  STraceId   *trace = &pMsg->info.traceId;
58,395✔
453
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
58,395!
454

455
  switch (pMsg->msgType) {
58,395!
456
    case TDMT_DND_CONFIG_DNODE:
101✔
457
      code = dmProcessConfigReq(pMgmt, pMsg);
101✔
458
      break;
101✔
459
    case TDMT_MND_AUTH_RSP:
×
460
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
461
      break;
×
462
    case TDMT_MND_GRANT_RSP:
×
463
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
464
      break;
×
465
    case TDMT_DND_CREATE_MNODE:
65✔
466
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
65✔
467
      break;
65✔
468
    case TDMT_DND_DROP_MNODE:
4✔
469
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
4✔
470
      break;
4✔
471
    case TDMT_DND_CREATE_QNODE:
253✔
472
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
253✔
473
      break;
253✔
474
    case TDMT_DND_DROP_QNODE:
7✔
475
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
7✔
476
      break;
7✔
477
    case TDMT_DND_CREATE_SNODE:
8✔
478
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
8✔
479
      break;
8✔
480
    case TDMT_DND_DROP_SNODE:
3✔
481
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
3✔
482
      break;
3✔
483
    case TDMT_DND_ALTER_MNODE_TYPE:
800✔
484
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
800✔
485
      break;
800✔
486
    case TDMT_DND_SERVER_STATUS:
2✔
487
      code = dmProcessServerRunStatus(pMgmt, pMsg);
2✔
488
      break;
2✔
489
    case TDMT_DND_SYSTABLE_RETRIEVE:
91✔
490
      code = dmProcessRetrieve(pMgmt, pMsg);
91✔
491
      break;
91✔
492
    case TDMT_MND_GRANT:
1,228✔
493
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
1,228✔
494
      break;
1,228✔
495
    case TDMT_MND_GRANT_NOTIFY:
55,832✔
496
      code = dmProcessGrantNotify(NULL, pMsg);
55,832✔
497
      break;
55,832✔
498
    case TDMT_DND_CREATE_ENCRYPT_KEY:
1✔
499
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
1✔
500
      break;
1✔
501
    default:
×
502
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
503
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
504
      break;
×
505
  }
506

507
  if (IsReq(pMsg)) {
58,395!
508
    if (code != 0 && terrno != 0) code = terrno;
58,395!
509
    SRpcMsg rsp = {
58,395✔
510
        .code = code,
511
        .pCont = pMsg->info.rsp,
58,395✔
512
        .contLen = pMsg->info.rspLen,
58,395✔
513
        .info = pMsg->info,
514
    };
515

516
    code = rpcSendResponse(&rsp);
58,395✔
517
    if (code != 0) {
58,395!
518
      dError("failed to send response since %s", tstrerror(code));
×
519
    }
520
  }
521

522
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
58,395✔
523
  rpcFreeCont(pMsg->pCont);
58,395✔
524
  taosFreeQitem(pMsg);
58,395✔
525
}
58,395✔
526

527
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
1,563✔
528
  int32_t          code = 0;
1,563✔
529
  SSingleWorkerCfg cfg = {
1,563✔
530
      .min = 1,
531
      .max = 1,
532
      .name = "dnode-mgmt",
533
      .fp = (FItem)dmProcessMgmtQueue,
534
      .param = pMgmt,
535
  };
536
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
1,563!
537
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
538
    return code;
×
539
  }
540

541
  dDebug("dnode workers are initialized");
1,563✔
542
  return 0;
1,563✔
543
}
544

545
void dmStopWorker(SDnodeMgmt *pMgmt) {
1,563✔
546
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
1,563✔
547
  dDebug("dnode workers are closed");
1,563✔
548
}
1,563✔
549

550
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
58,395✔
551
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
58,395✔
552
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
58,395✔
553
  return taosWriteQitem(pWorker->queue, pMsg);
58,395✔
554
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc