• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4943

30 Jan 2026 06:19AM UTC coverage: 66.718% (-0.07%) from 66.788%
#4943

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1122 of 2018 new or added lines in 72 files covered. (55.6%)

823 existing lines in 156 files now uncovered.

204811 of 306978 relevant lines covered (66.72%)

123993567.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.93
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20
#include "streamMsg.h"
21
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
22
#include "taoskInt.h"
23
#endif
24

25
// Encryption key expiration constants
26
#define MILLISECONDS_PER_DAY (24 * 3600 * 1000)
27

28
static void *dmStatusThreadFp(void *param) {
556,805✔
29
  SDnodeMgmt *pMgmt = param;
556,805✔
30
  int64_t     lastTime = taosGetTimestampMs();
556,805✔
31
  setThreadName("dnode-status");
556,805✔
32

33
  while (1) {
1,029,908,943✔
34
    taosMsleep(50);
1,030,465,748✔
35
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
1,030,465,748✔
36

37
    int64_t curTime = taosGetTimestampMs();
1,029,908,943✔
38
    if (curTime < lastTime) lastTime = curTime;
1,029,908,943✔
39
    float interval = curTime - lastTime;
1,029,908,943✔
40
    if (interval >= tsStatusIntervalMs) {
1,029,908,943✔
41
      dmSendStatusReq(pMgmt);
51,424,769✔
42
      lastTime = curTime;
51,424,769✔
43
    }
44
  }
45

46
  return NULL;
556,805✔
47
}
48

49
static void *dmConfigThreadFp(void *param) {
556,805✔
50
  SDnodeMgmt *pMgmt = param;
556,805✔
51
  int64_t     lastTime = taosGetTimestampMs();
556,805✔
52
  setThreadName("dnode-config");
556,805✔
53
  while (1) {
11,115,541✔
54
    taosMsleep(50);
11,672,346✔
55
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
11,672,346✔
56

57
    int64_t curTime = taosGetTimestampMs();
11,115,541✔
58
    if (curTime < lastTime) lastTime = curTime;
11,115,541✔
59
    float interval = curTime - lastTime;
11,115,541✔
60
    if (interval >= tsStatusIntervalMs) {
11,115,541✔
61
      dmSendConfigReq(pMgmt);
563,008✔
62
      lastTime = curTime;
563,008✔
63
    }
64
  }
65
  return NULL;
556,805✔
66
}
67

68
static void *dmKeySyncThreadFp(void *param) {
556,805✔
69
  SDnodeMgmt *pMgmt = param;
1,113,407✔
70
  int64_t     lastTime = taosGetTimestampMs();
556,805✔
71
  setThreadName("dnode-keysync");
556,805✔
72

73
  // Wait a bit before first sync attempt
74
  taosMsleep(3000);
556,805✔
75

76
  while (1) {
5,427,303✔
77
    taosMsleep(100);
5,984,108✔
78
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
5,984,108✔
79

80
    int64_t curTime = taosGetTimestampMs();
5,949,028✔
81
    if (curTime < lastTime) lastTime = curTime;
5,949,028✔
82
    float interval = curTime - lastTime;
5,949,028✔
83
    if (interval >= tsStatusIntervalMs) {
5,949,028✔
84
      // Sync keys periodically (every 30 seconds) or on first run
85
      if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
1,099,322✔
86
        // Check if encryption keys are expired based on configured threshold
87
        int64_t keyExpirationThreshold = (int64_t)tsKeyExpirationDays * MILLISECONDS_PER_DAY;
35,352✔
88
        int64_t svrKeyAge = curTime - tsSvrKeyUpdateTime;
35,352✔
89
        int64_t dbKeyAge = curTime - tsDbKeyUpdateTime;
35,352✔
90

91
        if (svrKeyAge > keyExpirationThreshold || dbKeyAge > keyExpirationThreshold) {
35,352✔
NEW
92
          const char *action = (strcmp(tsKeyExpirationStrategy, "ALARM") == 0) ? "warning" : "attempting reload";
×
NEW
93
          dWarn("encryption keys may be expired (threshold:%d days, strategy:%s), svrKeyAge:%" PRId64
×
94
                " days, dbKeyAge:%" PRId64 " days, %s",
95
                tsKeyExpirationDays, tsKeyExpirationStrategy, svrKeyAge / MILLISECONDS_PER_DAY,
96
                dbKeyAge / MILLISECONDS_PER_DAY, action);
97
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
98
          // Try to reload keys from file
99
          char masterKeyFile[PATH_MAX] = {0};
×
100
          char derivedKeyFile[PATH_MAX] = {0};
×
101
          snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
102
                   TD_DIRSEP, TD_DIRSEP);
103
          snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
104
                   TD_DIRSEP, TD_DIRSEP);
105

106
          char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
×
107
          char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
×
108
          char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
×
109
          char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
×
110
          char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
×
111
          int32_t algorithm = 0;
×
112
          int32_t cfgAlgorithm = 0;
×
113
          int32_t metaAlgorithm = 0;
×
114
          int32_t fileVersion = 0;
×
115
          int32_t keyVersion = 0;
×
116
          int64_t createTime = 0;
×
117
          int64_t svrKeyUpdateTime = 0;
×
118
          int64_t dbKeyUpdateTime = 0;
×
119

120
          int32_t code =
121
              taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
×
122
                                   &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
123
                                   &svrKeyUpdateTime, &dbKeyUpdateTime);
124
          if (code == 0) {
×
125
            // Update global variables with reloaded keys
126
            tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
×
127
            tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
×
128
            tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
×
129
            tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
×
130
            tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
×
131
            tsEncryptAlgorithmType = algorithm;
×
132
            tsCfgAlgorithm = cfgAlgorithm;
×
133
            tsMetaAlgorithm = metaAlgorithm;
×
134
            tsEncryptFileVersion = fileVersion;
×
135
            tsEncryptKeyVersion = keyVersion;
×
136
            tsEncryptKeyCreateTime = createTime;
×
137
            tsSvrKeyUpdateTime = svrKeyUpdateTime;
×
138
            tsDbKeyUpdateTime = dbKeyUpdateTime;
×
139

140
            // Check if keys are still expired after reload
141
            svrKeyAge = curTime - tsSvrKeyUpdateTime;
×
142
            dbKeyAge = curTime - tsDbKeyUpdateTime;
×
NEW
143
            if (svrKeyAge > keyExpirationThreshold || dbKeyAge > keyExpirationThreshold) {
×
NEW
144
              dError("encryption keys are still expired after reload (threshold:%d days), svrKeyAge:%" PRId64
×
145
                     " days, dbKeyAge:%" PRId64 " days, please rotate keys",
146
                     tsKeyExpirationDays, svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY);
147
            } else {
NEW
148
              dInfo("successfully reloaded encryption keys, svrKeyAge:%" PRId64 " days, dbKeyAge:%" PRId64
×
149
                    " days (threshold:%d days)",
150
                    svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY, tsKeyExpirationDays);
151
            }
152
          } else {
153
            dError("failed to reload encryption keys since %s", tstrerror(code));
×
154
          }
155
#endif
156
        }
157
      } else if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_DISABLED) {
1,063,970✔
158
        dInfo("encryption keys are disabled, stopping key sync thread");
521,725✔
159
        break;
521,725✔
160
      } else {
161
        dmSendKeySyncReq(pMgmt);
542,245✔
162
      }
163
      lastTime = curTime;
577,597✔
164
    }
165
  }
166
  return NULL;
556,805✔
167
}
168

169
static void *dmStatusInfoThreadFp(void *param) {
556,805✔
170
  SDnodeMgmt *pMgmt = param;
556,805✔
171
  int64_t     lastTime = taosGetTimestampMs();
556,805✔
172
  setThreadName("dnode-status-info");
556,805✔
173

174
  int32_t upTimeCount = 0;
556,805✔
175
  int64_t upTime = 0;
556,805✔
176

177
  while (1) {
1,043,692,242✔
178
    taosMsleep(50);
1,044,249,047✔
179
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
1,044,249,047✔
180

181
    int64_t curTime = taosGetTimestampMs();
1,043,692,242✔
182
    if (curTime < lastTime) lastTime = curTime;
1,043,692,242✔
183
    float interval = curTime - lastTime;
1,043,692,242✔
184
    if (interval >= tsStatusIntervalMs) {
1,043,692,242✔
185
      dmUpdateStatusInfo(pMgmt);
51,977,971✔
186
      lastTime = curTime;
51,977,971✔
187

188
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
51,977,971✔
189
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
618,603✔
190
        if (upTime > 0) tsDndUpTime = upTime;
618,603✔
191
      }
192
    }
193
  }
194

195
  return NULL;
556,805✔
196
}
197

198
#if defined(TD_ENTERPRISE)
199
SDmNotifyHandle dmNotifyHdl = {.state = 0};
200
#define TIMESERIES_STASH_NUM 5
201
static void *dmNotifyThreadFp(void *param) {
556,805✔
202
  SDnodeMgmt *pMgmt = param;
556,805✔
203
  int64_t     lastTime = taosGetTimestampMs();
556,805✔
204
  setThreadName("dnode-notify");
556,805✔
205

206
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
556,805✔
207
    return NULL;
×
208
  }
209

210
  // calculate approximate timeSeries per second
211
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
556,602✔
212
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
556,602✔
213
  int64_t  approximateTimeSeries = 0;
556,805✔
214
  uint64_t nTotalNotify = 0;
556,805✔
215
  int32_t  head, tail = 0;
556,805✔
216

217
  bool       wait = true;
556,805✔
218
  int32_t    nDnode = 0;
556,805✔
219
  int64_t    lastNotify = 0;
556,805✔
220
  int64_t    lastFetchDnode = 0;
556,805✔
221
  SNotifyReq req = {0};
556,805✔
222
  while (1) {
41,261,293✔
223
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
41,818,098✔
224
    if (wait) tsem_wait(&dmNotifyHdl.sem);
41,261,293✔
225
    atomic_store_8(&dmNotifyHdl.state, 1);
41,261,293✔
226

227
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
41,261,293✔
228
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
41,261,293✔
229
      goto _skip;
41,261,293✔
230
    }
231
    int64_t current = taosGetTimestampMs();
×
232
    if (current - lastFetchDnode > 1000) {
×
233
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
234
      if (nDnode < 1) nDnode = 1;
×
235
      lastFetchDnode = current;
×
236
    }
237
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
238
      req.dnodeId = pMgmt->pData->dnodeId;
×
239
      req.clusterId = pMgmt->pData->clusterId;
×
240
    }
241

242
    if (current - lastNotify < 10) {
×
243
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
244
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
245
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
246
        taosMsleep(10);
×
247
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
248
        taosMsleep(5);
×
249
      } else {
250
        taosMsleep(2);
×
251
      }
252
    }
253

254
    SMonVloadInfo vinfo = {0};
×
255
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
256
    req.pVloads = vinfo.pVloads;
×
257
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
258
    int64_t nTimeSeries = 0;
×
259
    for (int32_t i = 0; i < nVgroup; ++i) {
×
260
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
261
      nTimeSeries += vload->nTimeSeries;
×
262
    }
263
    notifyTimeSeries[tail] = nTimeSeries;
×
264
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
265
    ++nTotalNotify;
×
266

267
    approximateTimeSeries = 0;
×
268
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
269
      head = tail - TIMESERIES_STASH_NUM + 1;
×
270
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
271
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
272
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
273
      if (tsDiff > 0) {
×
274
        if (timeDiff > 0 && timeDiff < 1e9) {
×
275
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
276
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
277
            dmSendNotifyReq(pMgmt, &req);
×
278
          }
279
        } else {
280
          dmSendNotifyReq(pMgmt, &req);
×
281
        }
282
      }
283
    } else {
284
      dmSendNotifyReq(pMgmt, &req);
×
285
    }
286
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
287

288
    tFreeSNotifyReq(&req);
×
289
    lastNotify = taosGetTimestampMs();
×
290
  _skip:
41,261,293✔
291
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
41,261,293✔
292
      wait = true;
41,252,052✔
293
      continue;
41,252,052✔
294
    }
295
    wait = false;
9,241✔
296
  }
297

298
  return NULL;
556,805✔
299
}
300
#endif
301

302
#ifdef USE_MONITOR
303
static void *dmMonitorThreadFp(void *param) {
556,805✔
304
  SDnodeMgmt *pMgmt = param;
556,805✔
305
  int64_t     lastTime = taosGetTimestampMs();
556,805✔
306
  int64_t     lastTimeForBasic = taosGetTimestampMs();
556,805✔
307
  setThreadName("dnode-monitor");
556,805✔
308

309
  static int32_t TRIM_FREQ = 20;
310
  int32_t        trimCount = 0;
556,805✔
311

312
  while (1) {
262,061,170✔
313
    taosMsleep(200);
262,617,975✔
314
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
262,617,975✔
315

316
    int64_t curTime = taosGetTimestampMs();
262,061,170✔
317

318
    if (curTime < lastTime) lastTime = curTime;
262,061,170✔
319
    float interval = (curTime - lastTime) / 1000.0f;
262,061,170✔
320
    if (interval >= tsMonitorInterval) {
262,061,170✔
321
      (*pMgmt->sendMonitorReportFp)();
1,506,818✔
322
      (*pMgmt->monitorCleanExpiredSamplesFp)();
1,506,818✔
323
      lastTime = curTime;
1,506,818✔
324

325
      trimCount = (trimCount + 1) % TRIM_FREQ;
1,506,818✔
326
      if (trimCount == 0) {
1,506,818✔
327
        taosMemoryTrim(0, NULL);
9,386✔
328
      }
329
    }
330
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
262,061,170✔
331
      taosMemoryTrim(0, NULL);
9,821,756✔
332
    }
333
  }
334

335
  return NULL;
556,805✔
336
}
337
#endif
338
#ifdef USE_AUDIT
339
static void *dmAuditThreadFp(void *param) {
556,805✔
340
  SDnodeMgmt *pMgmt = param;
556,805✔
341
  int64_t     lastTime = taosGetTimestampMs();
556,805✔
342
  setThreadName("dnode-audit");
556,805✔
343

344
  while (1) {
523,597,889✔
345
    taosMsleep(100);
524,154,694✔
346
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
524,154,694✔
347

348
    int64_t curTime = taosGetTimestampMs();
523,597,889✔
349
    if (curTime < lastTime) lastTime = curTime;
523,597,889✔
350
    float interval = curTime - lastTime;
523,597,889✔
351
    if (interval >= tsAuditInterval) {
523,597,889✔
352
      (*pMgmt->sendAuditRecordsFp)();
10,210,441✔
353
      lastTime = curTime;
10,210,441✔
354
    }
355
  }
356

357
  return NULL;
556,805✔
358
}
359
#endif
360
#ifdef USE_REPORT
361
static void *dmCrashReportThreadFp(void *param) {
×
362
  int32_t     code = 0;
×
363
  SDnodeMgmt *pMgmt = param;
×
364
  int64_t     lastTime = taosGetTimestampMs();
×
365
  setThreadName("dnode-crashReport");
×
366
  char filepath[PATH_MAX] = {0};
×
367
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
368
  char     *pMsg = NULL;
×
369
  int64_t   msgLen = 0;
×
370
  TdFilePtr pFile = NULL;
×
371
  bool      truncateFile = false;
×
372
  int32_t   sleepTime = 200;
×
373
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
374
  int32_t   loopTimes = reportPeriodNum;
×
375

376
  STelemAddrMgmt mgt = {0};
×
377
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
378
  if (code != 0) {
×
379
    dError("failed to init telemetry since %s", tstrerror(code));
×
380
    return NULL;
×
381
  }
382
  code = initCrashLogWriter();
×
383
  if (code != 0) {
×
384
    dError("failed to init crash log writer since %s", tstrerror(code));
×
385
    return NULL;
×
386
  }
387

388
  while (1) {
389
    checkAndPrepareCrashInfo();
×
390
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
391
      break;
×
392
    }
393
    if (loopTimes++ < reportPeriodNum) {
×
394
      taosMsleep(sleepTime);
×
395
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
396
      continue;
×
397
    }
398
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
399
    if (pMsg && msgLen > 0) {
×
400
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
401
        dError("failed to send crash report");
×
402
        if (pFile) {
×
403
          taosReleaseCrashLogFile(pFile, false);
×
404
          pFile = NULL;
×
405

406
          taosMsleep(sleepTime);
×
407
          loopTimes = 0;
×
408
          continue;
×
409
        }
410
      } else {
411
        dInfo("succeed to send crash report");
×
412
        truncateFile = true;
×
413
      }
414
    } else {
415
      dInfo("no crash info was found");
×
416
    }
417

418
    taosMemoryFree(pMsg);
×
419

420
    if (pMsg && msgLen > 0) {
×
421
      pMsg = NULL;
×
422
      continue;
×
423
    }
424

425
    if (pFile) {
×
426
      taosReleaseCrashLogFile(pFile, truncateFile);
×
427
      pFile = NULL;
×
428
      truncateFile = false;
×
429
    }
430

431
    taosMsleep(sleepTime);
×
432
    loopTimes = 0;
×
433
  }
434
  taosTelemetryDestroy(&mgt);
×
435

436
  return NULL;
×
437
}
438
#endif
439

440
static void *dmMetricsThreadFp(void *param) {
556,805✔
441
  SDnodeMgmt *pMgmt = param;
556,805✔
442
  int64_t     lastTime = taosGetTimestampMs();
556,805✔
443
  setThreadName("dnode-metrics");
556,805✔
444
  while (1) {
262,058,148✔
445
    taosMsleep(200);
262,614,953✔
446
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
262,614,953✔
447

448
    int64_t curTime = taosGetTimestampMs();
262,058,148✔
449
    if (curTime < lastTime) lastTime = curTime;
262,058,148✔
450
    float interval = (curTime - lastTime) / 1000.0f;
262,058,148✔
451
    if (interval >= tsMetricsInterval) {
262,058,148✔
452
      (*pMgmt->sendMetricsReportFp)();
1,505,363✔
453
      (*pMgmt->metricsCleanExpiredSamplesFp)();
1,505,363✔
454
      lastTime = curTime;
1,505,363✔
455
    }
456
  }
457
  return NULL;
556,805✔
458
}
459

460
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
556,805✔
461
  int32_t      code = 0;
556,805✔
462
  TdThreadAttr thAttr;
556,602✔
463
  (void)taosThreadAttrInit(&thAttr);
556,805✔
464
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,805✔
465
#ifdef TD_COMPACT_OS
466
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
467
#endif
468
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
556,805✔
469
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
470
    dError("failed to create status thread since %s", tstrerror(code));
×
471
    return code;
×
472
  }
473

474
  (void)taosThreadAttrDestroy(&thAttr);
556,805✔
475
  tmsgReportStartup("dnode-status", "initialized");
556,805✔
476
  return 0;
556,805✔
477
}
478

479
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
556,805✔
480
  int32_t      code = 0;
556,805✔
481
  TdThreadAttr thAttr;
556,602✔
482
  (void)taosThreadAttrInit(&thAttr);
556,805✔
483
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,805✔
484
#ifdef TD_COMPACT_OS
485
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
486
#endif
487
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
556,805✔
488
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
489
    dError("failed to create config thread since %s", tstrerror(code));
×
490
    return code;
×
491
  }
492

493
  (void)taosThreadAttrDestroy(&thAttr);
556,805✔
494
  tmsgReportStartup("config-status", "initialized");
556,805✔
495
  return 0;
556,805✔
496
}
497

498
int32_t dmStartKeySyncThread(SDnodeMgmt *pMgmt) {
556,805✔
499
  int32_t      code = 0;
556,805✔
500
  TdThreadAttr thAttr;
556,602✔
501
  (void)taosThreadAttrInit(&thAttr);
556,805✔
502
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,805✔
503
#ifdef TD_COMPACT_OS
504
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
505
#endif
506
  if (taosThreadCreate(&pMgmt->keySyncThread, &thAttr, dmKeySyncThreadFp, pMgmt) != 0) {
556,805✔
507
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
508
    dError("failed to create key sync thread since %s", tstrerror(code));
×
509
    return code;
×
510
  }
511

512
  (void)taosThreadAttrDestroy(&thAttr);
556,805✔
513
  tmsgReportStartup("dnode-keysync", "initialized");
556,805✔
514
  return 0;
556,805✔
515
}
516

517
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
556,805✔
518
  int32_t      code = 0;
556,805✔
519
  TdThreadAttr thAttr;
556,602✔
520
  (void)taosThreadAttrInit(&thAttr);
556,805✔
521
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,805✔
522
#ifdef TD_COMPACT_OS
523
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
524
#endif
525
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
556,805✔
526
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
527
    dError("failed to create status Info thread since %s", tstrerror(code));
×
528
    return code;
×
529
  }
530

531
  (void)taosThreadAttrDestroy(&thAttr);
556,805✔
532
  tmsgReportStartup("dnode-status-info", "initialized");
556,805✔
533
  return 0;
556,805✔
534
}
535

536
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
556,805✔
537
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
556,805✔
538
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
556,805✔
539
    taosThreadClear(&pMgmt->statusThread);
556,805✔
540
  }
541
}
556,805✔
542

543
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
556,805✔
544
  if (taosCheckPthreadValid(pMgmt->configThread)) {
556,805✔
545
    (void)taosThreadJoin(pMgmt->configThread, NULL);
556,805✔
546
    taosThreadClear(&pMgmt->configThread);
556,805✔
547
  }
548
}
556,805✔
549

550
void dmStopKeySyncThread(SDnodeMgmt *pMgmt) {
556,805✔
551
  if (taosCheckPthreadValid(pMgmt->keySyncThread)) {
556,805✔
552
    (void)taosThreadJoin(pMgmt->keySyncThread, NULL);
556,805✔
553
    taosThreadClear(&pMgmt->keySyncThread);
556,805✔
554
  }
555
}
556,805✔
556

557
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
556,805✔
558
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
556,805✔
559
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
556,805✔
560
    taosThreadClear(&pMgmt->statusInfoThread);
556,805✔
561
  }
562
}
556,805✔
563
#ifdef TD_ENTERPRISE
564
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
556,805✔
565
  int32_t      code = 0;
556,805✔
566
  TdThreadAttr thAttr;
556,602✔
567
  (void)taosThreadAttrInit(&thAttr);
556,805✔
568
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,805✔
569
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
556,805✔
570
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
571
    dError("failed to create notify thread since %s", tstrerror(code));
×
572
    return code;
×
573
  }
574

575
  (void)taosThreadAttrDestroy(&thAttr);
556,805✔
576
  tmsgReportStartup("dnode-notify", "initialized");
556,805✔
577
  return 0;
556,805✔
578
}
579

580
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
556,805✔
581
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
556,805✔
582
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
556,805✔
583
      dError("failed to post notify sem");
×
584
    }
585

586
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
556,805✔
587
    taosThreadClear(&pMgmt->notifyThread);
556,805✔
588
  }
589
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
556,805✔
590
    dError("failed to destroy notify sem");
×
591
  }
592
}
556,805✔
593
#endif
594
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
556,805✔
595
  int32_t      code = 0;
556,805✔
596
#ifdef USE_MONITOR
597
  TdThreadAttr thAttr;
556,602✔
598
  (void)taosThreadAttrInit(&thAttr);
556,805✔
599
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,805✔
600
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
556,805✔
601
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
602
    dError("failed to create monitor thread since %s", tstrerror(code));
×
603
    return code;
×
604
  }
605

606
  (void)taosThreadAttrDestroy(&thAttr);
556,805✔
607
  tmsgReportStartup("dnode-monitor", "initialized");
556,805✔
608
#endif
609
  return 0;
556,805✔
610
}
611

612
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
556,805✔
613
  int32_t      code = 0;
556,805✔
614
#ifdef USE_AUDIT  
615
  TdThreadAttr thAttr;
556,602✔
616
  (void)taosThreadAttrInit(&thAttr);
556,805✔
617
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,805✔
618
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
556,805✔
619
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
620
    dError("failed to create audit thread since %s", tstrerror(code));
×
621
    return code;
×
622
  }
623

624
  (void)taosThreadAttrDestroy(&thAttr);
556,805✔
625
  tmsgReportStartup("dnode-audit", "initialized");
556,805✔
626
#endif  
627
  return 0;
556,805✔
628
}
629

630
int32_t dmStartMetricsThread(SDnodeMgmt *pMgmt) {
556,805✔
631
  int32_t code = 0;
556,805✔
632
#ifdef USE_MONITOR
633
  TdThreadAttr thAttr;
556,602✔
634
  (void)taosThreadAttrInit(&thAttr);
556,805✔
635
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,805✔
636
  if (taosThreadCreate(&pMgmt->metricsThread, &thAttr, dmMetricsThreadFp, pMgmt) != 0) {
556,805✔
637
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
638
    dError("failed to create metrics thread since %s", tstrerror(code));
×
639
    return code;
×
640
  }
641

642
  (void)taosThreadAttrDestroy(&thAttr);
556,805✔
643
  tmsgReportStartup("dnode-metrics", "initialized");
556,805✔
644
#endif
645
  return 0;
556,805✔
646
}
647

648
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
556,805✔
649
#ifdef USE_MONITOR
650
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
556,805✔
651
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
556,805✔
652
    taosThreadClear(&pMgmt->monitorThread);
556,805✔
653
  }
654
#endif
655
}
556,805✔
656

657
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
556,805✔
658
#ifdef USE_AUDIT
659
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
556,805✔
660
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
556,805✔
661
    taosThreadClear(&pMgmt->auditThread);
556,805✔
662
  }
663
#endif
664
}
556,805✔
665

666
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
556,805✔
667
  int32_t code = 0;
556,805✔
668
#ifdef USE_REPORT
669
  if (!tsEnableCrashReport) {
556,805✔
670
    return 0;
556,805✔
671
  }
672

673
  TdThreadAttr thAttr;
×
674
  (void)taosThreadAttrInit(&thAttr);
×
675
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
676
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
677
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
678
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
679
    return code;
×
680
  }
681

682
  (void)taosThreadAttrDestroy(&thAttr);
×
683
  tmsgReportStartup("dnode-crashReport", "initialized");
×
684
#endif
685
  return 0;
×
686
}
687

688
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
556,805✔
689
#ifdef USE_REPORT
690
  if (!tsEnableCrashReport) {
556,805✔
691
    return;
556,805✔
692
  }
693

694
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
695
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
696
    taosThreadClear(&pMgmt->crashReportThread);
×
697
  }
698
#endif
699
}
700

701
void dmStopMetricsThread(SDnodeMgmt *pMgmt) {
556,805✔
702
  if (taosCheckPthreadValid(pMgmt->metricsThread)) {
556,805✔
703
    (void)taosThreadJoin(pMgmt->metricsThread, NULL);
556,805✔
704
    taosThreadClear(&pMgmt->metricsThread);
556,805✔
705
  }
706
}
556,805✔
707

708
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
82,049,594✔
709
  SDnodeMgmt *pMgmt = pInfo->ahandle;
82,049,594✔
710
  int32_t     code = -1;
82,049,594✔
711
  STraceId   *trace = &pMsg->info.traceId;
82,049,594✔
712
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
82,049,594✔
713

714
  switch (pMsg->msgType) {
82,049,594✔
715
    case TDMT_DND_CONFIG_DNODE:
72,559✔
716
      code = dmProcessConfigReq(pMgmt, pMsg);
72,559✔
717
      break;
72,559✔
718
    case TDMT_MND_AUTH_RSP:
×
719
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
720
      break;
×
721
    case TDMT_MND_GRANT_RSP:
×
722
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
723
      break;
×
724
    case TDMT_DND_CREATE_MNODE:
13,967✔
725
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
13,967✔
726
      break;
13,967✔
727
    case TDMT_DND_DROP_MNODE:
592✔
728
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
592✔
729
      break;
592✔
730
    case TDMT_DND_CREATE_QNODE:
4,236✔
731
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
4,236✔
732
      break;
4,236✔
733
    case TDMT_DND_DROP_QNODE:
390✔
734
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
390✔
735
      break;
390✔
736
    case TDMT_DND_CREATE_SNODE:
57,161✔
737
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
57,161✔
738
      break;
57,161✔
739
    case TDMT_DND_ALTER_SNODE:
100,495✔
740
      code = (*pMgmt->processAlterNodeFp)(SNODE, pMsg);
100,495✔
741
      break;
100,495✔
742
    case TDMT_DND_DROP_SNODE:
35,603✔
743
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
35,603✔
744
      break;
35,603✔
745
    case TDMT_DND_CREATE_BNODE:
20,442✔
746
      code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg);
20,442✔
747
      break;
20,442✔
748
    case TDMT_DND_DROP_BNODE:
20,441✔
749
      code = (*pMgmt->processDropNodeFp)(BNODE, pMsg);
20,441✔
750
      break;
20,441✔
751
    case TDMT_DND_ALTER_MNODE_TYPE:
232,944✔
752
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
232,944✔
753
      break;
232,944✔
754
    case TDMT_DND_SERVER_STATUS:
183✔
755
      code = dmProcessServerRunStatus(pMgmt, pMsg);
183✔
756
      break;
183✔
757
    case TDMT_DND_SYSTABLE_RETRIEVE:
22,285✔
758
      code = dmProcessRetrieve(pMgmt, pMsg);
22,285✔
759
      break;
22,285✔
760
    case TDMT_MND_GRANT:
796,837✔
761
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
796,837✔
762
      break;
796,837✔
763
    case TDMT_MND_GRANT_NOTIFY:
80,671,300✔
764
      code = dmProcessGrantNotify(NULL, pMsg);
80,671,300✔
765
      break;
80,671,300✔
766
    case TDMT_DND_CREATE_ENCRYPT_KEY:
159✔
767
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
159✔
768
      break;
159✔
769
    case TDMT_MND_ALTER_ENCRYPT_KEY:
×
770
      code = dmProcessAlterEncryptKeyReq(pMgmt, pMsg);
×
771
      break;
×
NEW
772
    case TDMT_MND_ALTER_KEY_EXPIRATION:
×
NEW
773
      code = dmProcessAlterKeyExpirationReq(pMgmt, pMsg);
×
NEW
774
      break;
×
775
    case TDMT_DND_RELOAD_DNODE_TLS:
×
776
      code = dmProcessReloadTlsConfig(pMgmt, pMsg);
×
777
      // code = dmProcessReloadEncryptKeyReq(pMgmt, pMsg);
778
      break;
×
779
    default:
×
780

781
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
782
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
783
      break;
×
784
  }
785

786
  if (IsReq(pMsg)) {
82,049,594✔
787
    if (code != 0 && terrno != 0) code = terrno;
82,049,594✔
788
    SRpcMsg rsp = {
164,094,079✔
789
        .code = code,
790
        .pCont = pMsg->info.rsp,
82,049,594✔
791
        .contLen = pMsg->info.rspLen,
82,049,594✔
792
        .info = pMsg->info,
793
    };
794

795
    code = rpcSendResponse(&rsp);
82,049,594✔
796
    if (code != 0) {
82,049,594✔
797
      dError("failed to send response since %s", tstrerror(code));
×
798
    }
799
  }
800

801
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
82,049,594✔
802
  rpcFreeCont(pMsg->pCont);
82,049,594✔
803
  taosFreeQitem(pMsg);
82,049,594✔
804
}
82,049,594✔
805

806
int32_t dmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
17,751,689✔
807
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
17,751,689✔
808
  if (pMsg->code) {
17,751,689✔
809
    *pWorkerIdx = 0;
171,656✔
810
    return TSDB_CODE_SUCCESS;
171,656✔
811
  }
812
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
17,580,033✔
813
  *pWorkerIdx = pHeader->streamGid % tsNumOfStreamMgmtThreads;
17,580,033✔
814
  return TSDB_CODE_SUCCESS;
17,580,033✔
815
}
816

817

818
static void dmProcessStreamMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
17,751,689✔
819
  SDnodeMgmt *pMgmt = pInfo->ahandle;
17,751,689✔
820
  int32_t     code = -1;
17,751,689✔
821
  STraceId   *trace = &pMsg->info.traceId;
17,751,689✔
822
  dGTrace("msg:%p, will be processed in dnode stream mgmt queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
17,751,689✔
823

824
  switch (pMsg->msgType) {
17,751,689✔
825
    case TDMT_MND_STREAM_HEARTBEAT_RSP:
17,751,689✔
826
      code = dmProcessStreamHbRsp(pMgmt, pMsg);
17,751,689✔
827
      break;
17,751,689✔
828
    default:
×
829
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
830
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
831
      break;
×
832
  }
833

834
  if (IsReq(pMsg)) {
17,751,689✔
835
    if (code != 0 && terrno != 0) code = terrno;
×
836
    SRpcMsg rsp = {
×
837
        .code = code,
838
        .pCont = pMsg->info.rsp,
×
839
        .contLen = pMsg->info.rspLen,
×
840
        .info = pMsg->info,
841
    };
842

843
    code = rpcSendResponse(&rsp);
×
844
    if (code != 0) {
×
845
      dError("failed to send response since %s", tstrerror(code));
×
846
    }
847
  }
848

849
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
17,751,689✔
850
  rpcFreeCont(pMsg->pCont);
17,751,689✔
851
  taosFreeQitem(pMsg);
17,751,689✔
852
}
17,751,689✔
853

854

855
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
556,805✔
856
  int32_t          code = 0;
556,805✔
857
  SSingleWorkerCfg cfg = {
556,805✔
858
      .min = 1,
859
      .max = 1,
860
      .name = "dnode-mgmt",
861
      .fp = (FItem)dmProcessMgmtQueue,
862
      .param = pMgmt,
863
  };
864
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
556,805✔
865
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
866
    return code;
×
867
  }
868

869
  SDispatchWorkerPool* pStMgmtpool = &pMgmt->streamMgmtWorker;
556,805✔
870
  pStMgmtpool->max = tsNumOfStreamMgmtThreads;
556,805✔
871
  pStMgmtpool->name = "dnode-stream-mgmt";
556,805✔
872
  code = tDispatchWorkerInit(pStMgmtpool);
556,805✔
873
  if (code != 0) {
556,805✔
874
    dError("failed to start dnode-stream-mgmt worker since %s", tstrerror(code));
×
875
    return code;
×
876
  }
877
  code = tDispatchWorkerAllocQueue(pStMgmtpool, pMgmt, (FItem)dmProcessStreamMgmtQueue, dmDispatchStreamHbMsg);
556,805✔
878
  if (code != 0) {
556,805✔
879
    dError("failed to allocate dnode-stream-mgmt worker queue since %s", tstrerror(code));
×
880
    return code;
×
881
  }
882

883
  dDebug("dnode workers are initialized");
556,805✔
884
  return 0;
556,805✔
885
}
886

887
void dmStopWorker(SDnodeMgmt *pMgmt) {
556,805✔
888
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
556,805✔
889
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorker);
556,805✔
890
  dDebug("dnode workers are closed");
556,805✔
891
}
556,805✔
892

893
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
82,049,511✔
894
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
82,049,511✔
895
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
82,049,594✔
896
  return taosWriteQitem(pWorker->queue, pMsg);
82,049,594✔
897
}
898

899
int32_t dmPutMsgToStreamMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
17,751,689✔
900
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorker, pMsg);
17,751,689✔
901
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc