• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4935

22 Jan 2026 06:38AM UTC coverage: 66.708% (+0.02%) from 66.691%
#4935

push

travis-ci

web-flow
merge: from main to 3.0 #34371

121 of 271 new or added lines in 17 files covered. (44.65%)

9066 existing lines in 149 files now uncovered.

203884 of 305637 relevant lines covered (66.71%)

125811266.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.3
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20
#include "streamMsg.h"
21
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
22
#include "taoskInt.h"
23
#endif
24

25
// Encryption key expiration constants
26
#define ENCRYPT_KEY_EXPIRE_DAYS      30
27
#define MILLISECONDS_PER_DAY         (24 * 3600 * 1000)
28
#define ENCRYPT_KEY_EXPIRE_THRESHOLD ((int64_t)ENCRYPT_KEY_EXPIRE_DAYS * MILLISECONDS_PER_DAY)
29

30
static void *dmStatusThreadFp(void *param) {
544,803✔
31
  SDnodeMgmt *pMgmt = param;
544,803✔
32
  int64_t     lastTime = taosGetTimestampMs();
544,803✔
33
  setThreadName("dnode-status");
544,803✔
34

35
  while (1) {
820,676,785✔
36
    taosMsleep(50);
821,221,588✔
37
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
821,221,588✔
38

39
    int64_t curTime = taosGetTimestampMs();
820,676,785✔
40
    if (curTime < lastTime) lastTime = curTime;
820,676,785✔
41
    float interval = curTime - lastTime;
820,676,785✔
42
    if (interval >= tsStatusIntervalMs) {
820,676,785✔
43
      dmSendStatusReq(pMgmt);
41,119,207✔
44
      lastTime = curTime;
41,119,207✔
45
    }
46
  }
47

48
  return NULL;
544,803✔
49
}
50

51
static void *dmConfigThreadFp(void *param) {
544,803✔
52
  SDnodeMgmt *pMgmt = param;
544,803✔
53
  int64_t     lastTime = taosGetTimestampMs();
544,803✔
54
  setThreadName("dnode-config");
544,803✔
55
  while (1) {
10,894,355✔
56
    taosMsleep(50);
11,439,158✔
57
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
11,439,158✔
58

59
    int64_t curTime = taosGetTimestampMs();
10,894,355✔
60
    if (curTime < lastTime) lastTime = curTime;
10,894,355✔
61
    float interval = curTime - lastTime;
10,894,355✔
62
    if (interval >= tsStatusIntervalMs) {
10,894,355✔
63
      dmSendConfigReq(pMgmt);
562,825✔
64
      lastTime = curTime;
562,825✔
65
    }
66
  }
67
  return NULL;
544,803✔
68
}
69

70
static void *dmKeySyncThreadFp(void *param) {
544,803✔
71
  SDnodeMgmt *pMgmt = param;
544,803✔
72
  int64_t     lastTime = taosGetTimestampMs();
544,803✔
73
  setThreadName("dnode-keysync");
544,803✔
74

75
  // Wait a bit before first sync attempt
76
  taosMsleep(3000);
544,803✔
77

78
  while (1) {
5,442,060✔
79
    taosMsleep(100);
5,986,863✔
80
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
5,986,863✔
81

82
    int64_t curTime = taosGetTimestampMs();
5,945,358✔
83
    if (curTime < lastTime) lastTime = curTime;
5,945,358✔
84
    float interval = curTime - lastTime;
5,945,358✔
85
    if (interval >= tsStatusIntervalMs) {
5,945,358✔
86
      // Sync keys periodically (every 30 seconds) or on first run
87
      if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
1,101,858✔
88
        // Check if encryption keys are expired
89
        int64_t svrKeyAge = curTime - tsSvrKeyUpdateTime;
66,941✔
90
        int64_t dbKeyAge = curTime - tsDbKeyUpdateTime;
66,941✔
91

92
        if (svrKeyAge > ENCRYPT_KEY_EXPIRE_THRESHOLD || dbKeyAge > ENCRYPT_KEY_EXPIRE_THRESHOLD) {
66,941✔
93
          dWarn("encryption keys may be expired, svrKeyAge:%" PRId64 " days, dbKeyAge:%" PRId64
×
94
                " days, attempting reload",
95
                svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY);
96
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
97
          // Try to reload keys from file
98
          char masterKeyFile[PATH_MAX] = {0};
×
99
          char derivedKeyFile[PATH_MAX] = {0};
×
100
          snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
101
                   TD_DIRSEP, TD_DIRSEP);
102
          snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
103
                   TD_DIRSEP, TD_DIRSEP);
104

NEW
105
          char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
×
NEW
106
          char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
×
NEW
107
          char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
×
NEW
108
          char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
×
NEW
109
          char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
×
110
          int32_t algorithm = 0;
×
111
          int32_t cfgAlgorithm = 0;
×
112
          int32_t metaAlgorithm = 0;
×
113
          int32_t fileVersion = 0;
×
114
          int32_t keyVersion = 0;
×
115
          int64_t createTime = 0;
×
116
          int64_t svrKeyUpdateTime = 0;
×
117
          int64_t dbKeyUpdateTime = 0;
×
118

119
          int32_t code =
120
              taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
×
121
                                   &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
122
                                   &svrKeyUpdateTime, &dbKeyUpdateTime);
123
          if (code == 0) {
×
124
            // Update global variables with reloaded keys
125
            tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
×
126
            tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
×
127
            tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
×
128
            tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
×
129
            tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
×
130
            tsEncryptAlgorithmType = algorithm;
×
131
            tsCfgAlgorithm = cfgAlgorithm;
×
132
            tsMetaAlgorithm = metaAlgorithm;
×
133
            tsEncryptFileVersion = fileVersion;
×
134
            tsEncryptKeyVersion = keyVersion;
×
135
            tsEncryptKeyCreateTime = createTime;
×
136
            tsSvrKeyUpdateTime = svrKeyUpdateTime;
×
137
            tsDbKeyUpdateTime = dbKeyUpdateTime;
×
138

139
            // Check if keys are still expired after reload
140
            svrKeyAge = curTime - tsSvrKeyUpdateTime;
×
141
            dbKeyAge = curTime - tsDbKeyUpdateTime;
×
142
            if (svrKeyAge > ENCRYPT_KEY_EXPIRE_THRESHOLD || dbKeyAge > ENCRYPT_KEY_EXPIRE_THRESHOLD) {
×
143
              dError("encryption keys are still expired after reload, svrKeyAge:%" PRId64 " days, dbKeyAge:%" PRId64
×
144
                     " days, please rotate keys",
145
                     svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY);
146
            } else {
147
              dInfo("successfully reloaded encryption keys, svrKeyAge:%" PRId64 " days, dbKeyAge:%" PRId64 " days",
×
148
                    svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY);
149
            }
150
          } else {
151
            dError("failed to reload encryption keys since %s", tstrerror(code));
×
152
          }
153
#endif
154
      }
155
      } else if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_DISABLED) {
1,034,917✔
156
        dInfo("encryption keys are disabled, stopping key sync thread");
503,298✔
157
        break;
503,298✔
158
      } else {
159
        dmSendKeySyncReq(pMgmt);
531,619✔
160
      }
161
      lastTime = curTime;
598,560✔
162
    }
163
  }
164
  return NULL;
544,803✔
165
}
166

167
static void *dmStatusInfoThreadFp(void *param) {
544,803✔
168
  SDnodeMgmt *pMgmt = param;
544,803✔
169
  int64_t     lastTime = taosGetTimestampMs();
544,803✔
170
  setThreadName("dnode-status-info");
544,803✔
171

172
  int32_t upTimeCount = 0;
544,803✔
173
  int64_t upTime = 0;
544,803✔
174

175
  while (1) {
835,923,174✔
176
    taosMsleep(50);
836,467,977✔
177
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
836,467,977✔
178

179
    int64_t curTime = taosGetTimestampMs();
835,923,174✔
180
    if (curTime < lastTime) lastTime = curTime;
835,923,174✔
181
    float interval = curTime - lastTime;
835,923,174✔
182
    if (interval >= tsStatusIntervalMs) {
835,923,174✔
183
      dmUpdateStatusInfo(pMgmt);
41,662,439✔
184
      lastTime = curTime;
41,662,439✔
185

186
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
41,662,439✔
187
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
462,721✔
188
        if (upTime > 0) tsDndUpTime = upTime;
462,721✔
189
      }
190
    }
191
  }
192

193
  return NULL;
544,803✔
194
}
195

196
#if defined(TD_ENTERPRISE)
197
SDmNotifyHandle dmNotifyHdl = {.state = 0};
198
#define TIMESERIES_STASH_NUM 5
199
static void *dmNotifyThreadFp(void *param) {
544,803✔
200
  SDnodeMgmt *pMgmt = param;
544,803✔
201
  int64_t     lastTime = taosGetTimestampMs();
544,803✔
202
  setThreadName("dnode-notify");
544,803✔
203

204
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
544,803✔
205
    return NULL;
×
206
  }
207

208
  // calculate approximate timeSeries per second
209
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
543,401✔
210
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
543,401✔
211
  int64_t  approximateTimeSeries = 0;
544,803✔
212
  uint64_t nTotalNotify = 0;
544,803✔
213
  int32_t  head, tail = 0;
544,803✔
214

215
  bool       wait = true;
544,803✔
216
  int32_t    nDnode = 0;
544,803✔
217
  int64_t    lastNotify = 0;
544,803✔
218
  int64_t    lastFetchDnode = 0;
544,803✔
219
  SNotifyReq req = {0};
544,803✔
220
  while (1) {
48,890,228✔
221
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
49,435,031✔
222
    if (wait) tsem_wait(&dmNotifyHdl.sem);
48,890,228✔
223
    atomic_store_8(&dmNotifyHdl.state, 1);
48,890,228✔
224

225
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
48,890,228✔
226
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
48,890,228✔
227
      goto _skip;
48,890,228✔
228
    }
229
    int64_t current = taosGetTimestampMs();
×
230
    if (current - lastFetchDnode > 1000) {
×
231
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
232
      if (nDnode < 1) nDnode = 1;
×
233
      lastFetchDnode = current;
×
234
    }
235
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
236
      req.dnodeId = pMgmt->pData->dnodeId;
×
237
      req.clusterId = pMgmt->pData->clusterId;
×
238
    }
239

240
    if (current - lastNotify < 10) {
×
241
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
242
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
243
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
244
        taosMsleep(10);
×
245
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
246
        taosMsleep(5);
×
247
      } else {
248
        taosMsleep(2);
×
249
      }
250
    }
251

252
    SMonVloadInfo vinfo = {0};
×
253
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
254
    req.pVloads = vinfo.pVloads;
×
255
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
256
    int64_t nTimeSeries = 0;
×
257
    for (int32_t i = 0; i < nVgroup; ++i) {
×
258
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
259
      nTimeSeries += vload->nTimeSeries;
×
260
    }
261
    notifyTimeSeries[tail] = nTimeSeries;
×
262
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
263
    ++nTotalNotify;
×
264

265
    approximateTimeSeries = 0;
×
266
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
267
      head = tail - TIMESERIES_STASH_NUM + 1;
×
268
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
269
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
270
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
271
      if (tsDiff > 0) {
×
272
        if (timeDiff > 0 && timeDiff < 1e9) {
×
273
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
274
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
275
            dmSendNotifyReq(pMgmt, &req);
×
276
          }
277
        } else {
278
          dmSendNotifyReq(pMgmt, &req);
×
279
        }
280
      }
281
    } else {
282
      dmSendNotifyReq(pMgmt, &req);
×
283
    }
284
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
285

286
    tFreeSNotifyReq(&req);
×
287
    lastNotify = taosGetTimestampMs();
×
288
  _skip:
48,890,228✔
289
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
48,890,228✔
290
      wait = true;
48,881,995✔
291
      continue;
48,881,995✔
292
    }
293
    wait = false;
8,233✔
294
  }
295

296
  return NULL;
544,803✔
297
}
298
#endif
299

300
#ifdef USE_MONITOR
301
static void *dmMonitorThreadFp(void *param) {
544,803✔
302
  SDnodeMgmt *pMgmt = param;
544,803✔
303
  int64_t     lastTime = taosGetTimestampMs();
544,803✔
304
  int64_t     lastTimeForBasic = taosGetTimestampMs();
544,803✔
305
  setThreadName("dnode-monitor");
544,803✔
306

307
  static int32_t TRIM_FREQ = 20;
308
  int32_t        trimCount = 0;
544,803✔
309

310
  while (1) {
210,506,760✔
311
    taosMsleep(200);
211,051,563✔
312
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
211,051,563✔
313

314
    int64_t curTime = taosGetTimestampMs();
210,506,760✔
315

316
    if (curTime < lastTime) lastTime = curTime;
210,506,760✔
317
    float interval = (curTime - lastTime) / 1000.0f;
210,506,760✔
318
    if (interval >= tsMonitorInterval) {
210,506,760✔
319
      (*pMgmt->sendMonitorReportFp)();
1,180,434✔
320
      (*pMgmt->monitorCleanExpiredSamplesFp)();
1,180,434✔
321
      lastTime = curTime;
1,180,434✔
322

323
      trimCount = (trimCount + 1) % TRIM_FREQ;
1,180,434✔
324
      if (trimCount == 0) {
1,180,434✔
325
        taosMemoryTrim(0, NULL);
1,915✔
326
      }
327
    }
328
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
210,506,760✔
329
      taosMemoryTrim(0, NULL);
7,851,846✔
330
    }
331
  }
332

333
  return NULL;
544,803✔
334
}
335
#endif
336
#ifdef USE_AUDIT
337
static void *dmAuditThreadFp(void *param) {
544,803✔
338
  SDnodeMgmt *pMgmt = param;
544,803✔
339
  int64_t     lastTime = taosGetTimestampMs();
544,803✔
340
  setThreadName("dnode-audit");
544,803✔
341

342
  while (1) {
420,425,610✔
343
    taosMsleep(100);
420,970,413✔
344
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
420,970,413✔
345

346
    int64_t curTime = taosGetTimestampMs();
420,425,610✔
347
    if (curTime < lastTime) lastTime = curTime;
420,425,610✔
348
    float interval = curTime - lastTime;
420,425,610✔
349
    if (interval >= tsAuditInterval) {
420,425,610✔
350
      (*pMgmt->sendAuditRecordsFp)();
8,147,098✔
351
      lastTime = curTime;
8,147,098✔
352
    }
353
  }
354

355
  return NULL;
544,803✔
356
}
357
#endif
358
#ifdef USE_REPORT
359
static void *dmCrashReportThreadFp(void *param) {
×
360
  int32_t     code = 0;
×
361
  SDnodeMgmt *pMgmt = param;
×
362
  int64_t     lastTime = taosGetTimestampMs();
×
363
  setThreadName("dnode-crashReport");
×
364
  char filepath[PATH_MAX] = {0};
×
365
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
366
  char     *pMsg = NULL;
×
367
  int64_t   msgLen = 0;
×
368
  TdFilePtr pFile = NULL;
×
369
  bool      truncateFile = false;
×
370
  int32_t   sleepTime = 200;
×
371
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
372
  int32_t   loopTimes = reportPeriodNum;
×
373

374
  STelemAddrMgmt mgt = {0};
×
375
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
376
  if (code != 0) {
×
377
    dError("failed to init telemetry since %s", tstrerror(code));
×
378
    return NULL;
×
379
  }
380
  code = initCrashLogWriter();
×
381
  if (code != 0) {
×
382
    dError("failed to init crash log writer since %s", tstrerror(code));
×
383
    return NULL;
×
384
  }
385

386
  while (1) {
387
    checkAndPrepareCrashInfo();
×
388
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
389
      break;
×
390
    }
391
    if (loopTimes++ < reportPeriodNum) {
×
392
      taosMsleep(sleepTime);
×
393
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
394
      continue;
×
395
    }
396
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
397
    if (pMsg && msgLen > 0) {
×
398
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
399
        dError("failed to send crash report");
×
400
        if (pFile) {
×
401
          taosReleaseCrashLogFile(pFile, false);
×
402
          pFile = NULL;
×
403

404
          taosMsleep(sleepTime);
×
405
          loopTimes = 0;
×
406
          continue;
×
407
        }
408
      } else {
409
        dInfo("succeed to send crash report");
×
410
        truncateFile = true;
×
411
      }
412
    } else {
413
      dInfo("no crash info was found");
×
414
    }
415

416
    taosMemoryFree(pMsg);
×
417

418
    if (pMsg && msgLen > 0) {
×
419
      pMsg = NULL;
×
420
      continue;
×
421
    }
422

423
    if (pFile) {
×
424
      taosReleaseCrashLogFile(pFile, truncateFile);
×
425
      pFile = NULL;
×
426
      truncateFile = false;
×
427
    }
428

429
    taosMsleep(sleepTime);
×
430
    loopTimes = 0;
×
431
  }
432
  taosTelemetryDestroy(&mgt);
×
433

434
  return NULL;
×
435
}
436
#endif
437

438
static void *dmMetricsThreadFp(void *param) {
544,803✔
439
  SDnodeMgmt *pMgmt = param;
544,803✔
440
  int64_t     lastTime = taosGetTimestampMs();
544,803✔
441
  setThreadName("dnode-metrics");
544,803✔
442
  while (1) {
210,512,522✔
443
    taosMsleep(200);
211,057,325✔
444
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
211,057,325✔
445

446
    int64_t curTime = taosGetTimestampMs();
210,512,522✔
447
    if (curTime < lastTime) lastTime = curTime;
210,512,522✔
448
    float interval = (curTime - lastTime) / 1000.0f;
210,512,522✔
449
    if (interval >= tsMetricsInterval) {
210,512,522✔
450
      (*pMgmt->sendMetricsReportFp)();
1,179,331✔
451
      (*pMgmt->metricsCleanExpiredSamplesFp)();
1,179,331✔
452
      lastTime = curTime;
1,179,331✔
453
    }
454
  }
455
  return NULL;
544,803✔
456
}
457

458
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
544,803✔
459
  int32_t      code = 0;
544,803✔
460
  TdThreadAttr thAttr;
543,401✔
461
  (void)taosThreadAttrInit(&thAttr);
544,803✔
462
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
544,803✔
463
#ifdef TD_COMPACT_OS
464
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
465
#endif
466
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
544,803✔
467
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
468
    dError("failed to create status thread since %s", tstrerror(code));
×
469
    return code;
×
470
  }
471

472
  (void)taosThreadAttrDestroy(&thAttr);
544,803✔
473
  tmsgReportStartup("dnode-status", "initialized");
544,803✔
474
  return 0;
544,803✔
475
}
476

477
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
544,803✔
478
  int32_t      code = 0;
544,803✔
479
  TdThreadAttr thAttr;
543,401✔
480
  (void)taosThreadAttrInit(&thAttr);
544,803✔
481
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
544,803✔
482
#ifdef TD_COMPACT_OS
483
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
484
#endif
485
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
544,803✔
486
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
487
    dError("failed to create config thread since %s", tstrerror(code));
×
488
    return code;
×
489
  }
490

491
  (void)taosThreadAttrDestroy(&thAttr);
544,803✔
492
  tmsgReportStartup("config-status", "initialized");
544,803✔
493
  return 0;
544,803✔
494
}
495

496
int32_t dmStartKeySyncThread(SDnodeMgmt *pMgmt) {
544,803✔
497
  int32_t      code = 0;
544,803✔
498
  TdThreadAttr thAttr;
543,401✔
499
  (void)taosThreadAttrInit(&thAttr);
544,803✔
500
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
544,803✔
501
#ifdef TD_COMPACT_OS
502
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
503
#endif
504
  if (taosThreadCreate(&pMgmt->keySyncThread, &thAttr, dmKeySyncThreadFp, pMgmt) != 0) {
544,803✔
505
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
506
    dError("failed to create key sync thread since %s", tstrerror(code));
×
507
    return code;
×
508
  }
509

510
  (void)taosThreadAttrDestroy(&thAttr);
544,803✔
511
  tmsgReportStartup("dnode-keysync", "initialized");
544,803✔
512
  return 0;
544,803✔
513
}
514

515
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
544,803✔
516
  int32_t      code = 0;
544,803✔
517
  TdThreadAttr thAttr;
543,401✔
518
  (void)taosThreadAttrInit(&thAttr);
544,803✔
519
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
544,803✔
520
#ifdef TD_COMPACT_OS
521
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
522
#endif
523
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
544,803✔
524
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
525
    dError("failed to create status Info thread since %s", tstrerror(code));
×
526
    return code;
×
527
  }
528

529
  (void)taosThreadAttrDestroy(&thAttr);
544,803✔
530
  tmsgReportStartup("dnode-status-info", "initialized");
544,803✔
531
  return 0;
544,803✔
532
}
533

534
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
544,803✔
535
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
544,803✔
536
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
544,803✔
537
    taosThreadClear(&pMgmt->statusThread);
544,803✔
538
  }
539
}
544,803✔
540

541
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
544,803✔
542
  if (taosCheckPthreadValid(pMgmt->configThread)) {
544,803✔
543
    (void)taosThreadJoin(pMgmt->configThread, NULL);
544,803✔
544
    taosThreadClear(&pMgmt->configThread);
544,803✔
545
  }
546
}
544,803✔
547

548
void dmStopKeySyncThread(SDnodeMgmt *pMgmt) {
544,803✔
549
  if (taosCheckPthreadValid(pMgmt->keySyncThread)) {
544,803✔
550
    (void)taosThreadJoin(pMgmt->keySyncThread, NULL);
544,803✔
551
    taosThreadClear(&pMgmt->keySyncThread);
544,803✔
552
  }
553
}
544,803✔
554

555
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
544,803✔
556
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
544,803✔
557
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
544,803✔
558
    taosThreadClear(&pMgmt->statusInfoThread);
544,803✔
559
  }
560
}
544,803✔
561
#ifdef TD_ENTERPRISE
562
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
544,803✔
563
  int32_t      code = 0;
544,803✔
564
  TdThreadAttr thAttr;
543,401✔
565
  (void)taosThreadAttrInit(&thAttr);
544,803✔
566
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
544,803✔
567
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
544,803✔
568
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
569
    dError("failed to create notify thread since %s", tstrerror(code));
×
570
    return code;
×
571
  }
572

573
  (void)taosThreadAttrDestroy(&thAttr);
544,803✔
574
  tmsgReportStartup("dnode-notify", "initialized");
544,803✔
575
  return 0;
544,803✔
576
}
577

578
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
544,803✔
579
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
544,803✔
580
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
544,803✔
581
      dError("failed to post notify sem");
×
582
    }
583

584
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
544,803✔
585
    taosThreadClear(&pMgmt->notifyThread);
544,803✔
586
  }
587
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
544,803✔
588
    dError("failed to destroy notify sem");
×
589
  }
590
}
544,803✔
591
#endif
592
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
544,803✔
593
  int32_t      code = 0;
544,803✔
594
#ifdef USE_MONITOR
595
  TdThreadAttr thAttr;
543,401✔
596
  (void)taosThreadAttrInit(&thAttr);
544,803✔
597
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
544,803✔
598
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
544,803✔
599
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
600
    dError("failed to create monitor thread since %s", tstrerror(code));
×
601
    return code;
×
602
  }
603

604
  (void)taosThreadAttrDestroy(&thAttr);
544,803✔
605
  tmsgReportStartup("dnode-monitor", "initialized");
544,803✔
606
#endif
607
  return 0;
544,803✔
608
}
609

610
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
544,803✔
611
  int32_t      code = 0;
544,803✔
612
#ifdef USE_AUDIT  
613
  TdThreadAttr thAttr;
543,401✔
614
  (void)taosThreadAttrInit(&thAttr);
544,803✔
615
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
544,803✔
616
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
544,803✔
617
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
618
    dError("failed to create audit thread since %s", tstrerror(code));
×
619
    return code;
×
620
  }
621

622
  (void)taosThreadAttrDestroy(&thAttr);
544,803✔
623
  tmsgReportStartup("dnode-audit", "initialized");
544,803✔
624
#endif  
625
  return 0;
544,803✔
626
}
627

628
int32_t dmStartMetricsThread(SDnodeMgmt *pMgmt) {
544,803✔
629
  int32_t code = 0;
544,803✔
630
#ifdef USE_MONITOR
631
  TdThreadAttr thAttr;
543,401✔
632
  (void)taosThreadAttrInit(&thAttr);
544,803✔
633
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
544,803✔
634
  if (taosThreadCreate(&pMgmt->metricsThread, &thAttr, dmMetricsThreadFp, pMgmt) != 0) {
544,803✔
635
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
636
    dError("failed to create metrics thread since %s", tstrerror(code));
×
637
    return code;
×
638
  }
639

640
  (void)taosThreadAttrDestroy(&thAttr);
544,803✔
641
  tmsgReportStartup("dnode-metrics", "initialized");
544,803✔
642
#endif
643
  return 0;
544,803✔
644
}
645

646
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
544,803✔
647
#ifdef USE_MONITOR
648
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
544,803✔
649
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
544,803✔
650
    taosThreadClear(&pMgmt->monitorThread);
544,803✔
651
  }
652
#endif
653
}
544,803✔
654

655
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
544,803✔
656
#ifdef USE_AUDIT
657
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
544,803✔
658
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
544,803✔
659
    taosThreadClear(&pMgmt->auditThread);
544,803✔
660
  }
661
#endif
662
}
544,803✔
663

664
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
544,803✔
665
  int32_t code = 0;
544,803✔
666
#ifdef USE_REPORT
667
  if (!tsEnableCrashReport) {
544,803✔
668
    return 0;
544,803✔
669
  }
670

671
  TdThreadAttr thAttr;
×
672
  (void)taosThreadAttrInit(&thAttr);
×
673
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
674
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
675
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
676
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
677
    return code;
×
678
  }
679

680
  (void)taosThreadAttrDestroy(&thAttr);
×
681
  tmsgReportStartup("dnode-crashReport", "initialized");
×
682
#endif
683
  return 0;
×
684
}
685

686
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
544,803✔
687
#ifdef USE_REPORT
688
  if (!tsEnableCrashReport) {
544,803✔
689
    return;
544,803✔
690
  }
691

692
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
693
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
694
    taosThreadClear(&pMgmt->crashReportThread);
×
695
  }
696
#endif
697
}
698

699
void dmStopMetricsThread(SDnodeMgmt *pMgmt) {
544,803✔
700
  if (taosCheckPthreadValid(pMgmt->metricsThread)) {
544,803✔
701
    (void)taosThreadJoin(pMgmt->metricsThread, NULL);
544,803✔
702
    taosThreadClear(&pMgmt->metricsThread);
544,803✔
703
  }
704
}
544,803✔
705

706
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
72,734,991✔
707
  SDnodeMgmt *pMgmt = pInfo->ahandle;
72,734,991✔
708
  int32_t     code = -1;
72,734,991✔
709
  STraceId   *trace = &pMsg->info.traceId;
72,734,991✔
710
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
72,734,991✔
711

712
  switch (pMsg->msgType) {
72,734,991✔
713
    case TDMT_DND_CONFIG_DNODE:
70,610✔
714
      code = dmProcessConfigReq(pMgmt, pMsg);
70,610✔
715
      break;
70,610✔
716
    case TDMT_MND_AUTH_RSP:
×
717
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
718
      break;
×
719
    case TDMT_MND_GRANT_RSP:
×
720
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
721
      break;
×
722
    case TDMT_DND_CREATE_MNODE:
13,590✔
723
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
13,590✔
724
      break;
13,590✔
725
    case TDMT_DND_DROP_MNODE:
560✔
726
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
560✔
727
      break;
560✔
728
    case TDMT_DND_CREATE_QNODE:
3,594✔
729
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
3,594✔
730
      break;
3,594✔
731
    case TDMT_DND_DROP_QNODE:
372✔
732
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
372✔
733
      break;
372✔
734
    case TDMT_DND_CREATE_SNODE:
52,732✔
735
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
52,732✔
736
      break;
52,732✔
737
    case TDMT_DND_ALTER_SNODE:
92,166✔
738
      code = (*pMgmt->processAlterNodeFp)(SNODE, pMsg);
92,166✔
739
      break;
92,166✔
740
    case TDMT_DND_DROP_SNODE:
32,618✔
741
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
32,618✔
742
      break;
32,618✔
743
    case TDMT_DND_CREATE_BNODE:
19,901✔
744
      code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg);
19,901✔
745
      break;
19,901✔
746
    case TDMT_DND_DROP_BNODE:
19,890✔
747
      code = (*pMgmt->processDropNodeFp)(BNODE, pMsg);
19,890✔
748
      break;
19,890✔
749
    case TDMT_DND_ALTER_MNODE_TYPE:
171,503✔
750
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
171,503✔
751
      break;
171,503✔
752
    case TDMT_DND_SERVER_STATUS:
171✔
753
      code = dmProcessServerRunStatus(pMgmt, pMsg);
171✔
754
      break;
171✔
755
    case TDMT_DND_SYSTABLE_RETRIEVE:
22,951✔
756
      code = dmProcessRetrieve(pMgmt, pMsg);
22,951✔
757
      break;
22,951✔
758
    case TDMT_MND_GRANT:
735,578✔
759
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
735,578✔
760
      break;
735,578✔
761
    case TDMT_MND_GRANT_NOTIFY:
71,498,602✔
762
      code = dmProcessGrantNotify(NULL, pMsg);
71,498,602✔
763
      break;
71,498,602✔
764
    case TDMT_DND_CREATE_ENCRYPT_KEY:
153✔
765
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
153✔
766
      break;
153✔
767
    case TDMT_MND_ALTER_ENCRYPT_KEY:
×
768
      code = dmProcessAlterEncryptKeyReq(pMgmt, pMsg);
×
769
      break;
×
770
    case TDMT_DND_RELOAD_DNODE_TLS:
×
771
      code = dmProcessReloadTlsConfig(pMgmt, pMsg);
×
772
      // code = dmProcessReloadEncryptKeyReq(pMgmt, pMsg);
773
      break;
×
774
    default:
×
775

776
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
777
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
778
      break;
×
779
  }
780

781
  if (IsReq(pMsg)) {
72,734,991✔
782
    if (code != 0 && terrno != 0) code = terrno;
72,734,991✔
783
    SRpcMsg rsp = {
145,464,352✔
784
        .code = code,
785
        .pCont = pMsg->info.rsp,
72,734,991✔
786
        .contLen = pMsg->info.rspLen,
72,734,991✔
787
        .info = pMsg->info,
788
    };
789

790
    code = rpcSendResponse(&rsp);
72,734,991✔
791
    if (code != 0) {
72,734,991✔
792
      dError("failed to send response since %s", tstrerror(code));
×
793
    }
794
  }
795

796
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
72,734,991✔
797
  rpcFreeCont(pMsg->pCont);
72,734,991✔
798
  taosFreeQitem(pMsg);
72,734,991✔
799
}
72,734,991✔
800

801
int32_t dmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
14,150,505✔
802
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
14,150,505✔
803
  if (pMsg->code) {
14,150,505✔
804
    *pWorkerIdx = 0;
182,962✔
805
    return TSDB_CODE_SUCCESS;
182,962✔
806
  }
807
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
13,967,543✔
808
  *pWorkerIdx = pHeader->streamGid % tsNumOfStreamMgmtThreads;
13,967,543✔
809
  return TSDB_CODE_SUCCESS;
13,967,543✔
810
}
811

812

813
static void dmProcessStreamMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
14,150,505✔
814
  SDnodeMgmt *pMgmt = pInfo->ahandle;
14,150,505✔
815
  int32_t     code = -1;
14,150,505✔
816
  STraceId   *trace = &pMsg->info.traceId;
14,150,505✔
817
  dGTrace("msg:%p, will be processed in dnode stream mgmt queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
14,150,505✔
818

819
  switch (pMsg->msgType) {
14,150,505✔
820
    case TDMT_MND_STREAM_HEARTBEAT_RSP:
14,150,505✔
821
      code = dmProcessStreamHbRsp(pMgmt, pMsg);
14,150,505✔
822
      break;
14,150,505✔
823
    default:
×
824
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
825
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
826
      break;
×
827
  }
828

829
  if (IsReq(pMsg)) {
14,150,505✔
830
    if (code != 0 && terrno != 0) code = terrno;
×
831
    SRpcMsg rsp = {
×
832
        .code = code,
833
        .pCont = pMsg->info.rsp,
×
834
        .contLen = pMsg->info.rspLen,
×
835
        .info = pMsg->info,
836
    };
837

838
    code = rpcSendResponse(&rsp);
×
839
    if (code != 0) {
×
840
      dError("failed to send response since %s", tstrerror(code));
×
841
    }
842
  }
843

844
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
14,150,505✔
845
  rpcFreeCont(pMsg->pCont);
14,150,505✔
846
  taosFreeQitem(pMsg);
14,150,505✔
847
}
14,150,505✔
848

849

850
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
544,803✔
851
  int32_t          code = 0;
544,803✔
852
  SSingleWorkerCfg cfg = {
544,803✔
853
      .min = 1,
854
      .max = 1,
855
      .name = "dnode-mgmt",
856
      .fp = (FItem)dmProcessMgmtQueue,
857
      .param = pMgmt,
858
  };
859
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
544,803✔
860
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
861
    return code;
×
862
  }
863

864
  SDispatchWorkerPool* pStMgmtpool = &pMgmt->streamMgmtWorker;
544,803✔
865
  pStMgmtpool->max = tsNumOfStreamMgmtThreads;
544,803✔
866
  pStMgmtpool->name = "dnode-stream-mgmt";
544,803✔
867
  code = tDispatchWorkerInit(pStMgmtpool);
544,803✔
868
  if (code != 0) {
544,803✔
869
    dError("failed to start dnode-stream-mgmt worker since %s", tstrerror(code));
×
870
    return code;
×
871
  }
872
  code = tDispatchWorkerAllocQueue(pStMgmtpool, pMgmt, (FItem)dmProcessStreamMgmtQueue, dmDispatchStreamHbMsg);
544,803✔
873
  if (code != 0) {
544,803✔
874
    dError("failed to allocate dnode-stream-mgmt worker queue since %s", tstrerror(code));
×
875
    return code;
×
876
  }
877

878
  dDebug("dnode workers are initialized");
544,803✔
879
  return 0;
544,803✔
880
}
881

882
void dmStopWorker(SDnodeMgmt *pMgmt) {
544,803✔
883
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
544,803✔
884
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorker);
544,803✔
885
  dDebug("dnode workers are closed");
544,803✔
886
}
544,803✔
887

888
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
72,734,887✔
889
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
72,734,887✔
890
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
72,734,991✔
891
  return taosWriteQitem(pWorker->queue, pMsg);
72,734,991✔
892
}
893

894
int32_t dmPutMsgToStreamMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,150,505✔
895
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorker, pMsg);
14,150,505✔
896
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc