• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4986

15 Mar 2026 08:32AM UTC coverage: 37.305% (-31.3%) from 68.601%
#4986

push

travis-ci

tomchon
test: keep docs and unit test

125478 of 336361 relevant lines covered (37.3%)

1134847.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.99
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20
#include "streamMsg.h"
21
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
22
#include "taoskInt.h"
23
#endif
24

25
// Encryption key expiration constants
26
#define MILLISECONDS_PER_DAY (24 * 3600 * 1000)
27

28
static void *dmStatusThreadFp(void *param) {
16✔
29
  SDnodeMgmt *pMgmt = param;
16✔
30
  int64_t     lastTime = taosGetTimestampMs();
16✔
31
  setThreadName("dnode-status");
16✔
32

33
  while (1) {
759✔
34
    taosMsleep(50);
775✔
35
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
775✔
36

37
    int64_t curTime = taosGetTimestampMs();
759✔
38
    if (curTime < lastTime) lastTime = curTime;
759✔
39
    float interval = curTime - lastTime;
759✔
40
    if (interval >= tsStatusIntervalMs) {
759✔
41
      dmSendStatusReq(pMgmt);
33✔
42
      lastTime = curTime;
33✔
43
    }
44
  }
45

46
  return NULL;
16✔
47
}
48

49
static void *dmConfigThreadFp(void *param) {
16✔
50
  SDnodeMgmt *pMgmt = param;
16✔
51
  int64_t     lastTime = taosGetTimestampMs();
16✔
52
  setThreadName("dnode-config");
16✔
53
  while (1) {
761✔
54
    taosMsleep(50);
777✔
55
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
777✔
56

57
    int64_t curTime = taosGetTimestampMs();
761✔
58
    if (curTime < lastTime) lastTime = curTime;
761✔
59
    float interval = curTime - lastTime;
761✔
60
    if (interval >= tsStatusIntervalMs) {
761✔
61
      dmSendConfigReq(pMgmt);
33✔
62
      lastTime = curTime;
33✔
63
    }
64
  }
65
  return NULL;
16✔
66
}
67

68
static void *dmKeySyncThreadFp(void *param) {
16✔
69
  SDnodeMgmt *pMgmt = param;
16✔
70
  int64_t     lastTime = taosGetTimestampMs();
16✔
71
  setThreadName("dnode-keysync");
16✔
72

73
  // Wait a bit before first sync attempt
74
  taosMsleep(3000);
16✔
75

76
  while (1) {
20✔
77
    taosMsleep(100);
36✔
78
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
36✔
79

80
    int64_t curTime = taosGetTimestampMs();
22✔
81
    if (curTime < lastTime) lastTime = curTime;
22✔
82
    float interval = curTime - lastTime;
22✔
83
    if (interval >= tsStatusIntervalMs) {
22✔
84
      // Sync keys periodically (every 30 seconds) or on first run
85
      if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
4✔
86
        // Check if encryption keys are expired based on configured threshold
87
        int64_t keyExpirationThreshold = (int64_t)tsKeyExpirationDays * MILLISECONDS_PER_DAY;
×
88
        int64_t svrKeyAge = curTime - tsSvrKeyUpdateTime;
×
89
        int64_t dbKeyAge = curTime - tsDbKeyUpdateTime;
×
90

91
        if (svrKeyAge > keyExpirationThreshold || dbKeyAge > keyExpirationThreshold) {
×
92
          const char *action = (strcmp(tsKeyExpirationStrategy, "ALARM") == 0) ? "warning" : "attempting reload";
×
93
          dWarn("encryption keys may be expired (threshold:%d days, strategy:%s), svrKeyAge:%" PRId64
×
94
                " days, dbKeyAge:%" PRId64 " days, %s",
95
                tsKeyExpirationDays, tsKeyExpirationStrategy, svrKeyAge / MILLISECONDS_PER_DAY,
96
                dbKeyAge / MILLISECONDS_PER_DAY, action);
97
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
98
          // Try to reload keys from file
99
          char masterKeyFile[PATH_MAX] = {0};
×
100
          char derivedKeyFile[PATH_MAX] = {0};
×
101
          snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
102
                   TD_DIRSEP, TD_DIRSEP);
103
          snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
104
                   TD_DIRSEP, TD_DIRSEP);
105

106
          char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
×
107
          char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
×
108
          char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
×
109
          char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
×
110
          char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
×
111
          int32_t algorithm = 0;
×
112
          int32_t cfgAlgorithm = 0;
×
113
          int32_t metaAlgorithm = 0;
×
114
          int32_t fileVersion = 0;
×
115
          int32_t keyVersion = 0;
×
116
          int64_t createTime = 0;
×
117
          int64_t svrKeyUpdateTime = 0;
×
118
          int64_t dbKeyUpdateTime = 0;
×
119

120
          int32_t code =
121
              taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
×
122
                                   &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
123
                                   &svrKeyUpdateTime, &dbKeyUpdateTime);
124
          if (code == 0) {
×
125
            // Update global variables with reloaded keys
126
            tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
×
127
            tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
×
128
            tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
×
129
            tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
×
130
            tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
×
131
            tsEncryptAlgorithmType = algorithm;
×
132
            tsCfgAlgorithm = cfgAlgorithm;
×
133
            tsMetaAlgorithm = metaAlgorithm;
×
134
            tsEncryptFileVersion = fileVersion;
×
135
            tsEncryptKeyVersion = keyVersion;
×
136
            tsEncryptKeyCreateTime = createTime;
×
137
            tsSvrKeyUpdateTime = svrKeyUpdateTime;
×
138
            tsDbKeyUpdateTime = dbKeyUpdateTime;
×
139

140
            // Check if keys are still expired after reload
141
            svrKeyAge = curTime - tsSvrKeyUpdateTime;
×
142
            dbKeyAge = curTime - tsDbKeyUpdateTime;
×
143
            if (svrKeyAge > keyExpirationThreshold || dbKeyAge > keyExpirationThreshold) {
×
144
              dError("encryption keys are still expired after reload (threshold:%d days), svrKeyAge:%" PRId64
×
145
                     " days, dbKeyAge:%" PRId64 " days, please rotate keys",
146
                     tsKeyExpirationDays, svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY);
147
            } else {
148
              dInfo("successfully reloaded encryption keys, svrKeyAge:%" PRId64 " days, dbKeyAge:%" PRId64
×
149
                    " days (threshold:%d days)",
150
                    svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY, tsKeyExpirationDays);
151
            }
152
          } else {
153
            dError("failed to reload encryption keys since %s", tstrerror(code));
×
154
          }
155
#endif
156
        }
157
      } else if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_DISABLED) {
4✔
158
        dInfo("encryption keys are disabled, stopping key sync thread");
2✔
159
        break;
2✔
160
      } else {
161
        dmSendKeySyncReq(pMgmt);
2✔
162
      }
163
      lastTime = curTime;
2✔
164
    }
165
  }
166
  return NULL;
16✔
167
}
168

169
static void *dmStatusInfoThreadFp(void *param) {
16✔
170
  SDnodeMgmt *pMgmt = param;
16✔
171
  int64_t     lastTime = taosGetTimestampMs();
16✔
172
  setThreadName("dnode-status-info");
16✔
173

174
  int32_t upTimeCount = 0;
16✔
175
  int64_t upTime = 0;
16✔
176

177
  while (1) {
765✔
178
    taosMsleep(50);
781✔
179
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
781✔
180

181
    int64_t curTime = taosGetTimestampMs();
765✔
182
    if (curTime < lastTime) lastTime = curTime;
765✔
183
    float interval = curTime - lastTime;
765✔
184
    if (interval >= tsStatusIntervalMs) {
765✔
185
      dmUpdateStatusInfo(pMgmt);
33✔
186
      lastTime = curTime;
33✔
187

188
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
33✔
189
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
×
190
        if (upTime > 0) tsDndUpTime = upTime;
×
191
      }
192
    }
193
  }
194

195
  return NULL;
16✔
196
}
197

198
#if defined(TD_ENTERPRISE)
199
SDmNotifyHandle dmNotifyHdl = {.state = 0};
200
#define TIMESERIES_STASH_NUM 5
201
static void *dmNotifyThreadFp(void *param) {
16✔
202
  SDnodeMgmt *pMgmt = param;
16✔
203
  int64_t     lastTime = taosGetTimestampMs();
16✔
204
  setThreadName("dnode-notify");
16✔
205

206
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
16✔
207
    return NULL;
×
208
  }
209

210
  // calculate approximate timeSeries per second
211
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
212
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
213
  int64_t  approximateTimeSeries = 0;
16✔
214
  uint64_t nTotalNotify = 0;
16✔
215
  int32_t  head, tail = 0;
16✔
216

217
  bool       wait = true;
16✔
218
  int32_t    nDnode = 0;
16✔
219
  int64_t    lastNotify = 0;
16✔
220
  int64_t    lastFetchDnode = 0;
16✔
221
  SNotifyReq req = {0};
16✔
222
  while (1) {
16✔
223
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
32✔
224
    if (wait) tsem_wait(&dmNotifyHdl.sem);
16✔
225
    atomic_store_8(&dmNotifyHdl.state, 1);
16✔
226

227
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
16✔
228
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
16✔
229
      goto _skip;
16✔
230
    }
231
    int64_t current = taosGetTimestampMs();
×
232
    if (current - lastFetchDnode > 1000) {
×
233
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
234
      if (nDnode < 1) nDnode = 1;
×
235
      lastFetchDnode = current;
×
236
    }
237
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
238
      req.dnodeId = pMgmt->pData->dnodeId;
×
239
      req.clusterId = pMgmt->pData->clusterId;
×
240
    }
241

242
    if (current - lastNotify < 10) {
×
243
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
244
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
245
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
246
        taosMsleep(10);
×
247
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
248
        taosMsleep(5);
×
249
      } else {
250
        taosMsleep(2);
×
251
      }
252
    }
253

254
    SMonVloadInfo vinfo = {0};
×
255
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
256
    req.pVloads = vinfo.pVloads;
×
257
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
258
    int64_t nTimeSeries = 0;
×
259
    for (int32_t i = 0; i < nVgroup; ++i) {
×
260
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
261
      nTimeSeries += vload->nTimeSeries;
×
262
    }
263
    notifyTimeSeries[tail] = nTimeSeries;
×
264
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
265
    ++nTotalNotify;
×
266

267
    approximateTimeSeries = 0;
×
268
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
269
      head = tail - TIMESERIES_STASH_NUM + 1;
×
270
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
271
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
272
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
273
      if (tsDiff > 0) {
×
274
        if (timeDiff > 0 && timeDiff < 1e9) {
×
275
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
276
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
277
            dmSendNotifyReq(pMgmt, &req);
×
278
          }
279
        } else {
280
          dmSendNotifyReq(pMgmt, &req);
×
281
        }
282
      }
283
    } else {
284
      dmSendNotifyReq(pMgmt, &req);
×
285
    }
286
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
287

288
    tFreeSNotifyReq(&req);
×
289
    lastNotify = taosGetTimestampMs();
×
290
  _skip:
16✔
291
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
16✔
292
      wait = true;
16✔
293
      continue;
16✔
294
    }
295
    wait = false;
×
296
  }
297

298
  return NULL;
16✔
299
}
300
#endif
301

302
#ifdef USE_MONITOR
303
static void *dmMonitorThreadFp(void *param) {
16✔
304
  SDnodeMgmt *pMgmt = param;
16✔
305
  int64_t     lastTime = taosGetTimestampMs();
16✔
306
  int64_t     lastTimeForBasic = taosGetTimestampMs();
16✔
307
  setThreadName("dnode-monitor");
16✔
308

309
  static int32_t TRIM_FREQ = 20;
310
  int32_t        trimCount = 0;
16✔
311

312
  while (1) {
185✔
313
    taosMsleep(200);
201✔
314
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
201✔
315

316
    int64_t curTime = taosGetTimestampMs();
185✔
317

318
    if (curTime < lastTime) lastTime = curTime;
185✔
319
    float interval = (curTime - lastTime) / 1000.0f;
185✔
320
    if (interval >= tsMonitorInterval) {
185✔
321
      (*pMgmt->sendMonitorReportFp)();
×
322
      (*pMgmt->monitorCleanExpiredSamplesFp)();
×
323
      lastTime = curTime;
×
324

325
      trimCount = (trimCount + 1) % TRIM_FREQ;
×
326
      if (trimCount == 0) {
×
327
        taosMemoryTrim(0, NULL);
×
328
      }
329
    }
330
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
185✔
331
      taosMemoryTrim(0, NULL);
×
332
    }
333
  }
334

335
  return NULL;
16✔
336
}
337
#endif
338
#ifdef USE_AUDIT
339
static void *dmAuditThreadFp(void *param) {
16✔
340
  SDnodeMgmt *pMgmt = param;
16✔
341
  int64_t     lastTime = taosGetTimestampMs();
16✔
342
  setThreadName("dnode-audit");
16✔
343

344
  while (1) {
378✔
345
    taosMsleep(100);
394✔
346
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
394✔
347

348
    int64_t curTime = taosGetTimestampMs();
378✔
349
    if (curTime < lastTime) lastTime = curTime;
378✔
350
    float interval = curTime - lastTime;
378✔
351
    if (interval >= tsAuditInterval) {
378✔
352
      (*pMgmt->sendAuditRecordsFp)();
3✔
353
      lastTime = curTime;
3✔
354
    }
355
  }
356

357
  return NULL;
16✔
358
}
359
#endif
360
#ifdef USE_REPORT
361
static void *dmCrashReportThreadFp(void *param) {
×
362
  int32_t     code = 0;
×
363
  SDnodeMgmt *pMgmt = param;
×
364
  int64_t     lastTime = taosGetTimestampMs();
×
365
  setThreadName("dnode-crashReport");
×
366
  char filepath[PATH_MAX] = {0};
×
367
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
368
  char     *pMsg = NULL;
×
369
  int64_t   msgLen = 0;
×
370
  TdFilePtr pFile = NULL;
×
371
  bool      truncateFile = false;
×
372
  int32_t   sleepTime = 200;
×
373
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
374
  int32_t   loopTimes = reportPeriodNum;
×
375

376
  STelemAddrMgmt mgt = {0};
×
377
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
378
  if (code != 0) {
×
379
    dError("failed to init telemetry since %s", tstrerror(code));
×
380
    return NULL;
×
381
  }
382
  code = initCrashLogWriter();
×
383
  if (code != 0) {
×
384
    dError("failed to init crash log writer since %s", tstrerror(code));
×
385
    return NULL;
×
386
  }
387

388
  while (1) {
389
    checkAndPrepareCrashInfo();
×
390
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
391
      break;
×
392
    }
393
    if (loopTimes++ < reportPeriodNum) {
×
394
      taosMsleep(sleepTime);
×
395
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
396
      continue;
×
397
    }
398
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
399
    if (pMsg && msgLen > 0) {
×
400
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
401
        dError("failed to send crash report");
×
402
        if (pFile) {
×
403
          taosReleaseCrashLogFile(pFile, false);
×
404
          pFile = NULL;
×
405

406
          taosMsleep(sleepTime);
×
407
          loopTimes = 0;
×
408
          continue;
×
409
        }
410
      } else {
411
        dInfo("succeed to send crash report");
×
412
        truncateFile = true;
×
413
      }
414
    } else {
415
      dInfo("no crash info was found");
×
416
    }
417

418
    taosMemoryFree(pMsg);
×
419

420
    if (pMsg && msgLen > 0) {
×
421
      pMsg = NULL;
×
422
      continue;
×
423
    }
424

425
    if (pFile) {
×
426
      taosReleaseCrashLogFile(pFile, truncateFile);
×
427
      pFile = NULL;
×
428
      truncateFile = false;
×
429
    }
430

431
    taosMsleep(sleepTime);
×
432
    loopTimes = 0;
×
433
  }
434
  taosTelemetryDestroy(&mgt);
×
435

436
  return NULL;
×
437
}
438
#endif
439

440
static void *dmMetricsThreadFp(void *param) {
16✔
441
  SDnodeMgmt *pMgmt = param;
16✔
442
  int64_t     lastTime = taosGetTimestampMs();
16✔
443
  setThreadName("dnode-metrics");
16✔
444
  while (1) {
186✔
445
    taosMsleep(200);
202✔
446
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
202✔
447

448
    int64_t curTime = taosGetTimestampMs();
186✔
449
    if (curTime < lastTime) lastTime = curTime;
186✔
450
    float interval = (curTime - lastTime) / 1000.0f;
186✔
451
    if (interval >= tsMetricsInterval) {
186✔
452
      (*pMgmt->sendMetricsReportFp)();
×
453
      (*pMgmt->metricsCleanExpiredSamplesFp)();
×
454
      lastTime = curTime;
×
455
    }
456
  }
457
  return NULL;
16✔
458
}
459

460
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
16✔
461
  int32_t      code = 0;
16✔
462
  TdThreadAttr thAttr;
463
  (void)taosThreadAttrInit(&thAttr);
16✔
464
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
16✔
465
#ifdef TD_COMPACT_OS
466
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
467
#endif
468
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
16✔
469
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
470
    dError("failed to create status thread since %s", tstrerror(code));
×
471
    return code;
×
472
  }
473

474
  (void)taosThreadAttrDestroy(&thAttr);
16✔
475
  tmsgReportStartup("dnode-status", "initialized");
16✔
476
  return 0;
16✔
477
}
478

479
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
16✔
480
  int32_t      code = 0;
16✔
481
  TdThreadAttr thAttr;
482
  (void)taosThreadAttrInit(&thAttr);
16✔
483
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
16✔
484
#ifdef TD_COMPACT_OS
485
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
486
#endif
487
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
16✔
488
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
489
    dError("failed to create config thread since %s", tstrerror(code));
×
490
    return code;
×
491
  }
492

493
  (void)taosThreadAttrDestroy(&thAttr);
16✔
494
  tmsgReportStartup("config-status", "initialized");
16✔
495
  return 0;
16✔
496
}
497

498
int32_t dmStartKeySyncThread(SDnodeMgmt *pMgmt) {
16✔
499
  int32_t      code = 0;
16✔
500
  TdThreadAttr thAttr;
501
  (void)taosThreadAttrInit(&thAttr);
16✔
502
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
16✔
503
#ifdef TD_COMPACT_OS
504
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
505
#endif
506
  if (taosThreadCreate(&pMgmt->keySyncThread, &thAttr, dmKeySyncThreadFp, pMgmt) != 0) {
16✔
507
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
508
    dError("failed to create key sync thread since %s", tstrerror(code));
×
509
    return code;
×
510
  }
511

512
  (void)taosThreadAttrDestroy(&thAttr);
16✔
513
  tmsgReportStartup("dnode-keysync", "initialized");
16✔
514
  return 0;
16✔
515
}
516

517
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
16✔
518
  int32_t      code = 0;
16✔
519
  TdThreadAttr thAttr;
520
  (void)taosThreadAttrInit(&thAttr);
16✔
521
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
16✔
522
#ifdef TD_COMPACT_OS
523
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
524
#endif
525
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
16✔
526
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
527
    dError("failed to create status Info thread since %s", tstrerror(code));
×
528
    return code;
×
529
  }
530

531
  (void)taosThreadAttrDestroy(&thAttr);
16✔
532
  tmsgReportStartup("dnode-status-info", "initialized");
16✔
533
  return 0;
16✔
534
}
535

536
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
16✔
537
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
16✔
538
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
16✔
539
    taosThreadClear(&pMgmt->statusThread);
16✔
540
  }
541
}
16✔
542

543
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
16✔
544
  if (taosCheckPthreadValid(pMgmt->configThread)) {
16✔
545
    (void)taosThreadJoin(pMgmt->configThread, NULL);
16✔
546
    taosThreadClear(&pMgmt->configThread);
16✔
547
  }
548
}
16✔
549

550
void dmStopKeySyncThread(SDnodeMgmt *pMgmt) {
16✔
551
  if (taosCheckPthreadValid(pMgmt->keySyncThread)) {
16✔
552
    (void)taosThreadJoin(pMgmt->keySyncThread, NULL);
16✔
553
    taosThreadClear(&pMgmt->keySyncThread);
16✔
554
  }
555
}
16✔
556

557
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
16✔
558
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
16✔
559
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
16✔
560
    taosThreadClear(&pMgmt->statusInfoThread);
16✔
561
  }
562
}
16✔
563
#ifdef TD_ENTERPRISE
564
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
16✔
565
  int32_t      code = 0;
16✔
566
  TdThreadAttr thAttr;
567
  (void)taosThreadAttrInit(&thAttr);
16✔
568
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
16✔
569
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
16✔
570
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
571
    dError("failed to create notify thread since %s", tstrerror(code));
×
572
    return code;
×
573
  }
574

575
  (void)taosThreadAttrDestroy(&thAttr);
16✔
576
  tmsgReportStartup("dnode-notify", "initialized");
16✔
577
  return 0;
16✔
578
}
579

580
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
16✔
581
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
16✔
582
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
16✔
583
      dError("failed to post notify sem");
×
584
    }
585

586
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
16✔
587
    taosThreadClear(&pMgmt->notifyThread);
16✔
588
  }
589
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
16✔
590
    dError("failed to destroy notify sem");
×
591
  }
592
}
16✔
593
#endif
594
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
16✔
595
  int32_t      code = 0;
16✔
596
#ifdef USE_MONITOR
597
  TdThreadAttr thAttr;
598
  (void)taosThreadAttrInit(&thAttr);
16✔
599
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
16✔
600
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
16✔
601
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
602
    dError("failed to create monitor thread since %s", tstrerror(code));
×
603
    return code;
×
604
  }
605

606
  (void)taosThreadAttrDestroy(&thAttr);
16✔
607
  tmsgReportStartup("dnode-monitor", "initialized");
16✔
608
#endif
609
  return 0;
16✔
610
}
611

612
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
16✔
613
  int32_t      code = 0;
16✔
614
#ifdef USE_AUDIT  
615
  TdThreadAttr thAttr;
616
  (void)taosThreadAttrInit(&thAttr);
16✔
617
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
16✔
618
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
16✔
619
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
620
    dError("failed to create audit thread since %s", tstrerror(code));
×
621
    return code;
×
622
  }
623

624
  (void)taosThreadAttrDestroy(&thAttr);
16✔
625
  tmsgReportStartup("dnode-audit", "initialized");
16✔
626
#endif  
627
  return 0;
16✔
628
}
629

630
int32_t dmStartMetricsThread(SDnodeMgmt *pMgmt) {
16✔
631
  int32_t code = 0;
16✔
632
#ifdef USE_MONITOR
633
  TdThreadAttr thAttr;
634
  (void)taosThreadAttrInit(&thAttr);
16✔
635
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
16✔
636
  if (taosThreadCreate(&pMgmt->metricsThread, &thAttr, dmMetricsThreadFp, pMgmt) != 0) {
16✔
637
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
638
    dError("failed to create metrics thread since %s", tstrerror(code));
×
639
    return code;
×
640
  }
641

642
  (void)taosThreadAttrDestroy(&thAttr);
16✔
643
  tmsgReportStartup("dnode-metrics", "initialized");
16✔
644
#endif
645
  return 0;
16✔
646
}
647

648
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
16✔
649
#ifdef USE_MONITOR
650
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
16✔
651
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
16✔
652
    taosThreadClear(&pMgmt->monitorThread);
16✔
653
  }
654
#endif
655
}
16✔
656

657
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
16✔
658
#ifdef USE_AUDIT
659
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
16✔
660
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
16✔
661
    taosThreadClear(&pMgmt->auditThread);
16✔
662
  }
663
#endif
664
}
16✔
665

666
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
16✔
667
  int32_t code = 0;
16✔
668
#ifdef USE_REPORT
669
  if (!tsEnableCrashReport) {
16✔
670
    return 0;
16✔
671
  }
672

673
  TdThreadAttr thAttr;
674
  (void)taosThreadAttrInit(&thAttr);
×
675
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
676
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
677
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
678
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
679
    return code;
×
680
  }
681

682
  (void)taosThreadAttrDestroy(&thAttr);
×
683
  tmsgReportStartup("dnode-crashReport", "initialized");
×
684
#endif
685
  return 0;
×
686
}
687

688
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
16✔
689
#ifdef USE_REPORT
690
  if (!tsEnableCrashReport) {
16✔
691
    return;
16✔
692
  }
693

694
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
695
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
696
    taosThreadClear(&pMgmt->crashReportThread);
×
697
  }
698
#endif
699
}
700

701
void dmStopMetricsThread(SDnodeMgmt *pMgmt) {
16✔
702
  if (taosCheckPthreadValid(pMgmt->metricsThread)) {
16✔
703
    (void)taosThreadJoin(pMgmt->metricsThread, NULL);
16✔
704
    taosThreadClear(&pMgmt->metricsThread);
16✔
705
  }
706
}
16✔
707

708
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
16✔
709
  SDnodeMgmt *pMgmt = pInfo->ahandle;
16✔
710
  int32_t     code = -1;
16✔
711
  STraceId   *trace = &pMsg->info.traceId;
16✔
712
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
16✔
713

714
  switch (pMsg->msgType) {
16✔
715
    case TDMT_DND_CONFIG_DNODE:
×
716
      code = dmProcessConfigReq(pMgmt, pMsg);
×
717
      break;
×
718
    case TDMT_MND_AUTH_RSP:
×
719
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
720
      break;
×
721
    case TDMT_MND_GRANT_RSP:
×
722
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
723
      break;
×
724
    case TDMT_DND_CREATE_MNODE:
×
725
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
×
726
      break;
×
727
    case TDMT_DND_DROP_MNODE:
×
728
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
×
729
      break;
×
730
    case TDMT_DND_CREATE_QNODE:
10✔
731
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
10✔
732
      break;
10✔
733
    case TDMT_DND_DROP_QNODE:
6✔
734
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
6✔
735
      break;
6✔
736
    case TDMT_DND_CREATE_SNODE:
×
737
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
×
738
      break;
×
739
    case TDMT_DND_ALTER_SNODE:
×
740
      code = (*pMgmt->processAlterNodeFp)(SNODE, pMsg);
×
741
      break;
×
742
    case TDMT_DND_DROP_SNODE:
×
743
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
×
744
      break;
×
745
    case TDMT_DND_CREATE_BNODE:
×
746
      code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg);
×
747
      break;
×
748
    case TDMT_DND_DROP_BNODE:
×
749
      code = (*pMgmt->processDropNodeFp)(BNODE, pMsg);
×
750
      break;
×
751
    case TDMT_DND_ALTER_MNODE_TYPE:
×
752
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
×
753
      break;
×
754
    case TDMT_DND_SERVER_STATUS:
×
755
      code = dmProcessServerRunStatus(pMgmt, pMsg);
×
756
      break;
×
757
    case TDMT_DND_SYSTABLE_RETRIEVE:
×
758
      code = dmProcessRetrieve(pMgmt, pMsg);
×
759
      break;
×
760
    case TDMT_MND_GRANT:
×
761
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
×
762
      break;
×
763
    case TDMT_MND_GRANT_NOTIFY:
×
764
      code = dmProcessGrantNotify(NULL, pMsg);
×
765
      break;
×
766
    case TDMT_DND_CREATE_ENCRYPT_KEY:
×
767
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
×
768
      break;
×
769
    case TDMT_MND_ALTER_ENCRYPT_KEY:
×
770
      code = dmProcessAlterEncryptKeyReq(pMgmt, pMsg);
×
771
      break;
×
772
    case TDMT_MND_ALTER_KEY_EXPIRATION:
×
773
      code = dmProcessAlterKeyExpirationReq(pMgmt, pMsg);
×
774
      break;
×
775
    case TDMT_DND_RELOAD_DNODE_TLS:
×
776
      code = dmProcessReloadTlsConfig(pMgmt, pMsg);
×
777
      // code = dmProcessReloadEncryptKeyReq(pMgmt, pMsg);
778
      break;
×
779
    default:
×
780

781
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
782
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
783
      break;
×
784
  }
785

786
  if (IsReq(pMsg)) {
16✔
787
    if (code != 0 && terrno != 0) code = terrno;
16✔
788
    SRpcMsg rsp = {
16✔
789
        .code = code,
790
        .pCont = pMsg->info.rsp,
16✔
791
        .contLen = pMsg->info.rspLen,
16✔
792
        .info = pMsg->info,
793
    };
794

795
    code = rpcSendResponse(&rsp);
16✔
796
    if (code != 0) {
16✔
797
      dError("failed to send response since %s", tstrerror(code));
×
798
    }
799
  }
800

801
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
16✔
802
  rpcFreeCont(pMsg->pCont);
16✔
803
  taosFreeQitem(pMsg);
16✔
804
}
16✔
805

806
int32_t dmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
6✔
807
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
6✔
808
  if (pMsg->code) {
6✔
809
    *pWorkerIdx = 0;
×
810
    return TSDB_CODE_SUCCESS;
×
811
  }
812
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
6✔
813
  *pWorkerIdx = pHeader->streamGid % tsNumOfStreamMgmtThreads;
6✔
814
  return TSDB_CODE_SUCCESS;
6✔
815
}
816

817

818
static void dmProcessStreamMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
6✔
819
  SDnodeMgmt *pMgmt = pInfo->ahandle;
6✔
820
  int32_t     code = -1;
6✔
821
  STraceId   *trace = &pMsg->info.traceId;
6✔
822
  dGTrace("msg:%p, will be processed in dnode stream mgmt queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
6✔
823

824
  switch (pMsg->msgType) {
6✔
825
    case TDMT_MND_STREAM_HEARTBEAT_RSP:
6✔
826
      code = dmProcessStreamHbRsp(pMgmt, pMsg);
6✔
827
      break;
6✔
828
    default:
×
829
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
830
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
831
      break;
×
832
  }
833

834
  if (IsReq(pMsg)) {
6✔
835
    if (code != 0 && terrno != 0) code = terrno;
×
836
    SRpcMsg rsp = {
×
837
        .code = code,
838
        .pCont = pMsg->info.rsp,
×
839
        .contLen = pMsg->info.rspLen,
×
840
        .info = pMsg->info,
841
    };
842

843
    code = rpcSendResponse(&rsp);
×
844
    if (code != 0) {
×
845
      dError("failed to send response since %s", tstrerror(code));
×
846
    }
847
  }
848

849
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
6✔
850
  rpcFreeCont(pMsg->pCont);
6✔
851
  taosFreeQitem(pMsg);
6✔
852
}
6✔
853

854

855
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
16✔
856
  int32_t          code = 0;
16✔
857
  SSingleWorkerCfg cfg = {
16✔
858
      .min = 1,
859
      .max = 1,
860
      .name = "dnode-mgmt",
861
      .fp = (FItem)dmProcessMgmtQueue,
862
      .param = pMgmt,
863
  };
864
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
16✔
865
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
866
    return code;
×
867
  }
868

869
  SDispatchWorkerPool* pStMgmtpool = &pMgmt->streamMgmtWorker;
16✔
870
  pStMgmtpool->max = tsNumOfStreamMgmtThreads;
16✔
871
  pStMgmtpool->name = "dnode-stream-mgmt";
16✔
872
  code = tDispatchWorkerInit(pStMgmtpool);
16✔
873
  if (code != 0) {
16✔
874
    dError("failed to start dnode-stream-mgmt worker since %s", tstrerror(code));
×
875
    return code;
×
876
  }
877
  code = tDispatchWorkerAllocQueue(pStMgmtpool, pMgmt, (FItem)dmProcessStreamMgmtQueue, dmDispatchStreamHbMsg);
16✔
878
  if (code != 0) {
16✔
879
    dError("failed to allocate dnode-stream-mgmt worker queue since %s", tstrerror(code));
×
880
    return code;
×
881
  }
882

883
  dDebug("dnode workers are initialized");
16✔
884
  return 0;
16✔
885
}
886

887
void dmStopWorker(SDnodeMgmt *pMgmt) {
16✔
888
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
16✔
889
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorker);
16✔
890
  dDebug("dnode workers are closed");
16✔
891
}
16✔
892

893
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
16✔
894
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
16✔
895
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
16✔
896
  return taosWriteQitem(pWorker->queue, pMsg);
16✔
897
}
898

899
int32_t dmPutMsgToStreamMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6✔
900
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorker, pMsg);
6✔
901
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc