• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5044

06 May 2026 02:35AM UTC coverage: 73.169% (+0.06%) from 73.107%
#5044

push

travis-ci

web-flow
feat: [6659794715] cpu limit (#35153)

244 of 275 new or added lines in 23 files covered. (88.73%)

526 existing lines in 141 files now uncovered.

277745 of 379596 relevant lines covered (73.17%)

133740972.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.79
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20
#include "streamMsg.h"
21
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
22
#include "taoskInt.h"
23
#endif
24

25
// Encryption key expiration constants
26
#define MILLISECONDS_PER_DAY (24 * 3600 * 1000)
27

28
static void *dmStatusThreadFp(void *param) {
670,555✔
29
  SDnodeMgmt *pMgmt = param;
670,555✔
30
  int64_t     lastTime = taosGetTimestampMs();
670,555✔
31
  setThreadName("dnode-status");
670,555✔
32
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
670,555✔
33

34
  while (1) {
1,053,443,412✔
35
    taosMsleep(50);
1,054,113,967✔
36
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
1,054,113,967✔
37

38
    int64_t curTime = taosGetTimestampMs();
1,053,443,412✔
39
    if (curTime < lastTime) lastTime = curTime;
1,053,443,412✔
40
    float interval = curTime - lastTime;
1,053,443,412✔
41
    if (interval >= tsStatusIntervalMs) {
1,053,443,412✔
42
      dmSendStatusReq(pMgmt);
52,777,661✔
43
      lastTime = curTime;
52,777,661✔
44
    }
45
  }
46

47
  return NULL;
670,555✔
48
}
49

50
static void *dmConfigThreadFp(void *param) {
670,555✔
51
  SDnodeMgmt *pMgmt = param;
670,555✔
52
  int64_t     lastTime = taosGetTimestampMs();
670,555✔
53
  setThreadName("dnode-config");
670,555✔
54
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
670,555✔
55
  while (1) {
13,392,268✔
56
    taosMsleep(50);
14,062,823✔
57
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
14,062,823✔
58

59
    int64_t curTime = taosGetTimestampMs();
13,392,268✔
60
    if (curTime < lastTime) lastTime = curTime;
13,392,268✔
61
    float interval = curTime - lastTime;
13,392,268✔
62
    if (interval >= tsStatusIntervalMs) {
13,392,268✔
63
      dmSendConfigReq(pMgmt);
672,690✔
64
      lastTime = curTime;
672,690✔
65
    }
66
  }
67
  return NULL;
670,555✔
68
}
69

70
static void *dmKeySyncThreadFp(void *param) {
670,555✔
71
  SDnodeMgmt *pMgmt = param;
670,555✔
72
  int64_t     lastTime = taosGetTimestampMs();
670,555✔
73
  setThreadName("dnode-keysync");
670,555✔
74
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
670,555✔
75

76
  // Wait a bit before first sync attempt
77
  taosMsleep(3000);
670,555✔
78

79
  while (1) {
6,593,492✔
80
    taosMsleep(100);
7,264,047✔
81
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
7,264,047✔
82

83
    int64_t curTime = taosGetTimestampMs();
7,207,843✔
84
    if (curTime < lastTime) lastTime = curTime;
7,207,843✔
85
    float interval = curTime - lastTime;
7,207,843✔
86
    if (interval >= tsStatusIntervalMs) {
7,207,843✔
87
      // Sync keys periodically (every 30 seconds) or on first run
88
      if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
1,307,717✔
89
        // Check if encryption keys are expired based on configured threshold
90
        int64_t keyExpirationThreshold = (int64_t)tsKeyExpirationDays * MILLISECONDS_PER_DAY;
47,902✔
91
        int64_t svrKeyAge = curTime - tsSvrKeyUpdateTime;
47,902✔
92
        int64_t dbKeyAge = curTime - tsDbKeyUpdateTime;
47,902✔
93

94
        if (svrKeyAge > keyExpirationThreshold || dbKeyAge > keyExpirationThreshold) {
47,902✔
95
          const char *action = (strcmp(tsKeyExpirationStrategy, "ALARM") == 0) ? "warning" : "attempting reload";
×
96
          dWarn("encryption keys may be expired (threshold:%d days, strategy:%s), svrKeyAge:%" PRId64
×
97
                " days, dbKeyAge:%" PRId64 " days, %s",
98
                tsKeyExpirationDays, tsKeyExpirationStrategy, svrKeyAge / MILLISECONDS_PER_DAY,
99
                dbKeyAge / MILLISECONDS_PER_DAY, action);
100
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
101
          // Try to reload keys from file
102
          char masterKeyFile[PATH_MAX] = {0};
×
103
          char derivedKeyFile[PATH_MAX] = {0};
×
104
          snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
105
                   TD_DIRSEP, TD_DIRSEP);
106
          snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
107
                   TD_DIRSEP, TD_DIRSEP);
108

109
          char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
×
110
          char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
×
111
          char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
×
112
          char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
×
113
          char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
×
114
          int32_t algorithm = 0;
×
115
          int32_t cfgAlgorithm = 0;
×
116
          int32_t metaAlgorithm = 0;
×
117
          int32_t fileVersion = 0;
×
118
          int32_t keyVersion = 0;
×
119
          int64_t createTime = 0;
×
120
          int64_t svrKeyUpdateTime = 0;
×
121
          int64_t dbKeyUpdateTime = 0;
×
122

123
          int32_t code =
124
              taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
×
125
                                   &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
126
                                   &svrKeyUpdateTime, &dbKeyUpdateTime);
127
          if (code == 0) {
×
128
            // Update global variables with reloaded keys
129
            tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
×
130
            tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
×
131
            tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
×
132
            tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
×
133
            tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
×
134
            tsEncryptAlgorithmType = algorithm;
×
135
            tsCfgAlgorithm = cfgAlgorithm;
×
136
            tsMetaAlgorithm = metaAlgorithm;
×
137
            tsEncryptFileVersion = fileVersion;
×
138
            tsEncryptKeyVersion = keyVersion;
×
139
            tsEncryptKeyCreateTime = createTime;
×
140
            tsSvrKeyUpdateTime = svrKeyUpdateTime;
×
141
            tsDbKeyUpdateTime = dbKeyUpdateTime;
×
142

143
            // Check if keys are still expired after reload
144
            svrKeyAge = curTime - tsSvrKeyUpdateTime;
×
145
            dbKeyAge = curTime - tsDbKeyUpdateTime;
×
146
            if (svrKeyAge > keyExpirationThreshold || dbKeyAge > keyExpirationThreshold) {
×
147
              dError("encryption keys are still expired after reload (threshold:%d days), svrKeyAge:%" PRId64
×
148
                     " days, dbKeyAge:%" PRId64 " days, please rotate keys",
149
                     tsKeyExpirationDays, svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY);
150
            } else {
151
              dInfo("successfully reloaded encryption keys, svrKeyAge:%" PRId64 " days, dbKeyAge:%" PRId64
×
152
                    " days (threshold:%d days)",
153
                    svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY, tsKeyExpirationDays);
154
            }
155
          } else {
156
            dError("failed to reload encryption keys since %s", tstrerror(code));
×
157
          }
158
#endif
159
        }
160
      } else if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_DISABLED) {
1,259,815✔
161
        dInfo("encryption keys are disabled, stopping key sync thread");
614,351✔
162
        break;
614,351✔
163
      } else {
164
        dmSendKeySyncReq(pMgmt);
645,464✔
165
      }
166
      lastTime = curTime;
693,366✔
167
    }
168
  }
169
  return NULL;
670,555✔
170
}
171

172
static void *dmStatusInfoThreadFp(void *param) {
670,555✔
173
  SDnodeMgmt *pMgmt = param;
670,555✔
174
  int64_t     lastTime = taosGetTimestampMs();
670,555✔
175
  setThreadName("dnode-status-info");
670,555✔
176
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
670,555✔
177

178
  int32_t upTimeCount = 0;
670,555✔
179
  int64_t upTime = 0;
670,555✔
180

181
  while (1) {
1,069,298,124✔
182
    taosMsleep(50);
1,069,968,679✔
183
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
1,069,968,679✔
184

185
    int64_t curTime = taosGetTimestampMs();
1,069,298,124✔
186
    if (curTime < lastTime) lastTime = curTime;
1,069,298,124✔
187
    float interval = curTime - lastTime;
1,069,298,124✔
188
    if (interval >= tsStatusIntervalMs) {
1,069,298,124✔
189
      dmUpdateStatusInfo(pMgmt);
53,340,837✔
190
      lastTime = curTime;
53,340,837✔
191

192
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
53,340,837✔
193
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
606,238✔
194
        if (upTime > 0) tsDndUpTime = upTime;
606,238✔
195
      }
196
    }
197
  }
198

199
  return NULL;
670,555✔
200
}
201

202
#if defined(TD_ENTERPRISE)
203
SDmNotifyHandle dmNotifyHdl = {.state = 0};
204
#define TIMESERIES_STASH_NUM 5
205
static void *dmNotifyThreadFp(void *param) {
670,555✔
206
  SDnodeMgmt *pMgmt = param;
670,555✔
207
  int64_t     lastTime = taosGetTimestampMs();
670,555✔
208
  setThreadName("dnode-notify");
670,555✔
209
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
670,555✔
210

211
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
670,555✔
212
    return NULL;
×
213
  }
214

215
  // calculate approximate timeSeries per second
216
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
669,452✔
217
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
669,452✔
218
  int64_t  approximateTimeSeries = 0;
670,555✔
219
  uint64_t nTotalNotify = 0;
670,555✔
220
  int32_t  head, tail = 0;
670,555✔
221

222
  bool       wait = true;
670,555✔
223
  int32_t    nDnode = 0;
670,555✔
224
  int64_t    lastNotify = 0;
670,555✔
225
  int64_t    lastFetchDnode = 0;
670,555✔
226
  SNotifyReq req = {0};
670,555✔
227
  while (1) {
55,190,961✔
228
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
55,861,516✔
229
    if (wait) tsem_wait(&dmNotifyHdl.sem);
55,190,961✔
230
    atomic_store_8(&dmNotifyHdl.state, 1);
55,190,961✔
231

232
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
55,190,961✔
233
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
55,190,961✔
234
      goto _skip;
55,190,961✔
235
    }
236
    int64_t current = taosGetTimestampMs();
×
237
    if (current - lastFetchDnode > 1000) {
×
238
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
239
      if (nDnode < 1) nDnode = 1;
×
240
      lastFetchDnode = current;
×
241
    }
242
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
243
      req.dnodeId = pMgmt->pData->dnodeId;
×
244
      req.clusterId = pMgmt->pData->clusterId;
×
245
    }
246

247
    if (current - lastNotify < 10) {
×
248
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
249
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
250
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
251
        taosMsleep(10);
×
252
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
253
        taosMsleep(5);
×
254
      } else {
255
        taosMsleep(2);
×
256
      }
257
    }
258

259
    SMonVloadInfo vinfo = {0};
×
260
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
261
    req.pVloads = vinfo.pVloads;
×
262
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
263
    int64_t nTimeSeries = 0;
×
264
    for (int32_t i = 0; i < nVgroup; ++i) {
×
265
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
266
      nTimeSeries += vload->nTimeSeries;
×
267
    }
268
    notifyTimeSeries[tail] = nTimeSeries;
×
269
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
270
    ++nTotalNotify;
×
271

272
    approximateTimeSeries = 0;
×
273
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
274
      head = tail - TIMESERIES_STASH_NUM + 1;
×
275
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
276
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
277
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
278
      if (tsDiff > 0) {
×
279
        if (timeDiff > 0 && timeDiff < 1e9) {
×
280
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
281
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
282
            dmSendNotifyReq(pMgmt, &req);
×
283
          }
284
        } else {
285
          dmSendNotifyReq(pMgmt, &req);
×
286
        }
287
      }
288
    } else {
289
      dmSendNotifyReq(pMgmt, &req);
×
290
    }
291
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
292

293
    tFreeSNotifyReq(&req);
×
294
    lastNotify = taosGetTimestampMs();
×
295
  _skip:
55,190,961✔
296
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
55,190,961✔
297
      wait = true;
55,183,583✔
298
      continue;
55,183,583✔
299
    }
300
    wait = false;
7,378✔
301
  }
302

303
  return NULL;
670,555✔
304
}
305
#endif
306

307
#ifdef USE_MONITOR
308
static void *dmMonitorThreadFp(void *param) {
670,555✔
309
  SDnodeMgmt *pMgmt = param;
670,555✔
310
  int64_t     lastTime = taosGetTimestampMs();
670,555✔
311
  int64_t     lastTimeForBasic = taosGetTimestampMs();
670,555✔
312
  setThreadName("dnode-monitor");
670,555✔
313
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
670,555✔
314

315
  static int32_t TRIM_FREQ = 20;
316
  int32_t        trimCount = 0;
670,555✔
317

318
  while (1) {
269,866,171✔
319
    taosMsleep(200);
270,536,726✔
320
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
270,536,726✔
321

322
    int64_t curTime = taosGetTimestampMs();
269,866,171✔
323

324
    if (curTime < lastTime) lastTime = curTime;
269,866,171✔
325
    float interval = (curTime - lastTime) / 1000.0f;
269,866,171✔
326
    if (interval >= tsMonitorInterval) {
269,866,171✔
327
      (*pMgmt->sendMonitorReportFp)();
1,517,827✔
328
      (*pMgmt->monitorCleanExpiredSamplesFp)();
1,517,827✔
329
      lastTime = curTime;
1,517,827✔
330

331
      trimCount = (trimCount + 1) % TRIM_FREQ;
1,517,827✔
332
      if (trimCount == 0) {
1,517,827✔
333
        taosMemoryTrim(0, NULL);
5,827✔
334
      }
335
    }
336
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
269,866,171✔
337
      taosMemoryTrim(0, NULL);
9,983,300✔
338
    }
339
  }
340

341
  return NULL;
670,555✔
342
}
343
#endif
344
#ifdef USE_AUDIT
345
static void *dmAuditThreadFp(void *param) {
670,555✔
346
  SDnodeMgmt *pMgmt = param;
670,555✔
347
  int64_t     lastTime = taosGetTimestampMs();
670,555✔
348
  setThreadName("dnode-audit");
670,555✔
349
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
670,555✔
350

351
  while (1) {
538,658,709✔
352
    taosMsleep(100);
539,329,264✔
353
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
539,329,264✔
354

355
    int64_t curTime = taosGetTimestampMs();
538,658,709✔
356
    if (curTime < lastTime) lastTime = curTime;
538,658,709✔
357
    float interval = curTime - lastTime;
538,658,709✔
358
    if (interval >= tsAuditInterval) {
538,658,709✔
359
      (*pMgmt->sendAuditRecordsFp)();
10,457,611✔
360
      lastTime = curTime;
10,457,611✔
361
    }
362
  }
363

364
  return NULL;
670,555✔
365
}
366
#endif
367
#ifdef USE_REPORT
368
static void *dmCrashReportThreadFp(void *param) {
×
369
  int32_t     code = 0;
×
370
  SDnodeMgmt *pMgmt = param;
×
371
  int64_t     lastTime = taosGetTimestampMs();
×
372
  setThreadName("dnode-crashReport");
×
NEW
373
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
×
374
  char filepath[PATH_MAX] = {0};
×
375
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
376
  char     *pMsg = NULL;
×
377
  int64_t   msgLen = 0;
×
378
  TdFilePtr pFile = NULL;
×
379
  bool      truncateFile = false;
×
380
  int32_t   sleepTime = 200;
×
381
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
382
  int32_t   loopTimes = reportPeriodNum;
×
383

384
  STelemAddrMgmt mgt = {0};
×
385
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
386
  if (code != 0) {
×
387
    dError("failed to init telemetry since %s", tstrerror(code));
×
388
    return NULL;
×
389
  }
390
  code = initCrashLogWriter();
×
391
  if (code != 0) {
×
392
    dError("failed to init crash log writer since %s", tstrerror(code));
×
393
    return NULL;
×
394
  }
395

396
  while (1) {
397
    checkAndPrepareCrashInfo();
×
398
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
399
      break;
×
400
    }
401
    if (loopTimes++ < reportPeriodNum) {
×
402
      taosMsleep(sleepTime);
×
403
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
404
      continue;
×
405
    }
406
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
407
    if (pMsg && msgLen > 0) {
×
408
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
409
        dError("failed to send crash report");
×
410
        if (pFile) {
×
411
          taosReleaseCrashLogFile(pFile, false);
×
412
          pFile = NULL;
×
413

414
          taosMsleep(sleepTime);
×
415
          loopTimes = 0;
×
416
          continue;
×
417
        }
418
      } else {
419
        dInfo("succeed to send crash report");
×
420
        truncateFile = true;
×
421
      }
422
    } else {
423
      dInfo("no crash info was found");
×
424
    }
425

426
    taosMemoryFree(pMsg);
×
427

428
    if (pMsg && msgLen > 0) {
×
429
      pMsg = NULL;
×
430
      continue;
×
431
    }
432

433
    if (pFile) {
×
434
      taosReleaseCrashLogFile(pFile, truncateFile);
×
435
      pFile = NULL;
×
436
      truncateFile = false;
×
437
    }
438

439
    taosMsleep(sleepTime);
×
440
    loopTimes = 0;
×
441
  }
442
  taosTelemetryDestroy(&mgt);
×
443

444
  return NULL;
×
445
}
446
#endif
447

448
static void *dmMetricsThreadFp(void *param) {
670,555✔
449
  SDnodeMgmt *pMgmt = param;
670,555✔
450
  int64_t     lastTime = taosGetTimestampMs();
670,555✔
451
  setThreadName("dnode-metrics");
670,555✔
452
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
670,555✔
453
  while (1) {
269,875,107✔
454
    taosMsleep(200);
270,545,662✔
455
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
270,545,662✔
456

457
    int64_t curTime = taosGetTimestampMs();
269,875,107✔
458
    if (curTime < lastTime) lastTime = curTime;
269,875,107✔
459
    float interval = (curTime - lastTime) / 1000.0f;
269,875,107✔
460
    if (interval >= tsMetricsInterval) {
269,875,107✔
461
      (*pMgmt->sendMetricsReportFp)();
1,516,512✔
462
      (*pMgmt->metricsCleanExpiredSamplesFp)();
1,516,512✔
463
      lastTime = curTime;
1,516,512✔
464
    }
465
  }
466
  return NULL;
670,555✔
467
}
468

469
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
670,555✔
470
  int32_t      code = 0;
670,555✔
471
  TdThreadAttr thAttr;
669,452✔
472
  (void)taosThreadAttrInit(&thAttr);
670,555✔
473
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
670,555✔
474
#ifdef TD_COMPACT_OS
475
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
476
#endif
477
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
670,555✔
478
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
479
    dError("failed to create status thread since %s", tstrerror(code));
×
480
    return code;
×
481
  }
482

483
  (void)taosThreadAttrDestroy(&thAttr);
670,555✔
484
  tmsgReportStartup("dnode-status", "initialized");
670,555✔
485
  return 0;
670,555✔
486
}
487

488
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
670,555✔
489
  int32_t      code = 0;
670,555✔
490
  TdThreadAttr thAttr;
669,452✔
491
  (void)taosThreadAttrInit(&thAttr);
670,555✔
492
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
670,555✔
493
#ifdef TD_COMPACT_OS
494
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
495
#endif
496
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
670,555✔
497
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
498
    dError("failed to create config thread since %s", tstrerror(code));
×
499
    return code;
×
500
  }
501

502
  (void)taosThreadAttrDestroy(&thAttr);
670,555✔
503
  tmsgReportStartup("config-status", "initialized");
670,555✔
504
  return 0;
670,555✔
505
}
506

507
int32_t dmStartKeySyncThread(SDnodeMgmt *pMgmt) {
670,555✔
508
  int32_t      code = 0;
670,555✔
509
  TdThreadAttr thAttr;
669,452✔
510
  (void)taosThreadAttrInit(&thAttr);
670,555✔
511
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
670,555✔
512
#ifdef TD_COMPACT_OS
513
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
514
#endif
515
  if (taosThreadCreate(&pMgmt->keySyncThread, &thAttr, dmKeySyncThreadFp, pMgmt) != 0) {
670,555✔
516
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
517
    dError("failed to create key sync thread since %s", tstrerror(code));
×
518
    return code;
×
519
  }
520

521
  (void)taosThreadAttrDestroy(&thAttr);
670,555✔
522
  tmsgReportStartup("dnode-keysync", "initialized");
670,555✔
523
  return 0;
670,555✔
524
}
525

526
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
670,555✔
527
  int32_t      code = 0;
670,555✔
528
  TdThreadAttr thAttr;
669,452✔
529
  (void)taosThreadAttrInit(&thAttr);
670,555✔
530
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
670,555✔
531
#ifdef TD_COMPACT_OS
532
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
533
#endif
534
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
670,555✔
535
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
536
    dError("failed to create status Info thread since %s", tstrerror(code));
×
537
    return code;
×
538
  }
539

540
  (void)taosThreadAttrDestroy(&thAttr);
670,555✔
541
  tmsgReportStartup("dnode-status-info", "initialized");
670,555✔
542
  return 0;
670,555✔
543
}
544

545
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
670,555✔
546
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
670,555✔
547
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
670,555✔
548
    taosThreadClear(&pMgmt->statusThread);
670,555✔
549
  }
550
}
670,555✔
551

552
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
670,555✔
553
  if (taosCheckPthreadValid(pMgmt->configThread)) {
670,555✔
554
    (void)taosThreadJoin(pMgmt->configThread, NULL);
670,555✔
555
    taosThreadClear(&pMgmt->configThread);
670,555✔
556
  }
557
}
670,555✔
558

559
void dmStopKeySyncThread(SDnodeMgmt *pMgmt) {
670,555✔
560
  if (taosCheckPthreadValid(pMgmt->keySyncThread)) {
670,555✔
561
    (void)taosThreadJoin(pMgmt->keySyncThread, NULL);
670,555✔
562
    taosThreadClear(&pMgmt->keySyncThread);
670,555✔
563
  }
564
}
670,555✔
565

566
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
670,555✔
567
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
670,555✔
568
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
670,555✔
569
    taosThreadClear(&pMgmt->statusInfoThread);
670,555✔
570
  }
571
}
670,555✔
572
#ifdef TD_ENTERPRISE
573
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
670,555✔
574
  int32_t      code = 0;
670,555✔
575
  TdThreadAttr thAttr;
669,452✔
576
  (void)taosThreadAttrInit(&thAttr);
670,555✔
577
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
670,555✔
578
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
670,555✔
579
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
580
    dError("failed to create notify thread since %s", tstrerror(code));
×
581
    return code;
×
582
  }
583

584
  (void)taosThreadAttrDestroy(&thAttr);
670,555✔
585
  tmsgReportStartup("dnode-notify", "initialized");
670,555✔
586
  return 0;
670,555✔
587
}
588

589
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
670,555✔
590
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
670,555✔
591
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
670,555✔
592
      dError("failed to post notify sem");
×
593
    }
594

595
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
670,555✔
596
    taosThreadClear(&pMgmt->notifyThread);
670,555✔
597
  }
598
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
670,555✔
599
    dError("failed to destroy notify sem");
×
600
  }
601
}
670,555✔
602
#endif
603
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
670,555✔
604
  int32_t      code = 0;
670,555✔
605
#ifdef USE_MONITOR
606
  TdThreadAttr thAttr;
669,452✔
607
  (void)taosThreadAttrInit(&thAttr);
670,555✔
608
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
670,555✔
609
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
670,555✔
610
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
611
    dError("failed to create monitor thread since %s", tstrerror(code));
×
612
    return code;
×
613
  }
614

615
  (void)taosThreadAttrDestroy(&thAttr);
670,555✔
616
  tmsgReportStartup("dnode-monitor", "initialized");
670,555✔
617
#endif
618
  return 0;
670,555✔
619
}
620

621
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
670,555✔
622
  int32_t      code = 0;
670,555✔
623
#ifdef USE_AUDIT  
624
  TdThreadAttr thAttr;
669,452✔
625
  (void)taosThreadAttrInit(&thAttr);
670,555✔
626
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
670,555✔
627
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
670,555✔
628
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
629
    dError("failed to create audit thread since %s", tstrerror(code));
×
630
    return code;
×
631
  }
632

633
  (void)taosThreadAttrDestroy(&thAttr);
670,555✔
634
  tmsgReportStartup("dnode-audit", "initialized");
670,555✔
635
#endif  
636
  return 0;
670,555✔
637
}
638

639
int32_t dmStartMetricsThread(SDnodeMgmt *pMgmt) {
670,555✔
640
  int32_t code = 0;
670,555✔
641
#ifdef USE_MONITOR
642
  TdThreadAttr thAttr;
669,452✔
643
  (void)taosThreadAttrInit(&thAttr);
670,555✔
644
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
670,555✔
645
  if (taosThreadCreate(&pMgmt->metricsThread, &thAttr, dmMetricsThreadFp, pMgmt) != 0) {
670,555✔
646
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
647
    dError("failed to create metrics thread since %s", tstrerror(code));
×
648
    return code;
×
649
  }
650

651
  (void)taosThreadAttrDestroy(&thAttr);
670,555✔
652
  tmsgReportStartup("dnode-metrics", "initialized");
670,555✔
653
#endif
654
  return 0;
670,555✔
655
}
656

657
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
670,555✔
658
#ifdef USE_MONITOR
659
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
670,555✔
660
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
670,555✔
661
    taosThreadClear(&pMgmt->monitorThread);
670,555✔
662
  }
663
#endif
664
}
670,555✔
665

666
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
670,555✔
667
#ifdef USE_AUDIT
668
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
670,555✔
669
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
670,555✔
670
    taosThreadClear(&pMgmt->auditThread);
670,555✔
671
  }
672
#endif
673
}
670,555✔
674

675
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
670,555✔
676
  int32_t code = 0;
670,555✔
677
#ifdef USE_REPORT
678
  if (!tsEnableCrashReport) {
670,555✔
679
    return 0;
670,555✔
680
  }
681

682
  TdThreadAttr thAttr;
×
683
  (void)taosThreadAttrInit(&thAttr);
×
684
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
685
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
686
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
687
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
688
    return code;
×
689
  }
690

691
  (void)taosThreadAttrDestroy(&thAttr);
×
692
  tmsgReportStartup("dnode-crashReport", "initialized");
×
693
#endif
694
  return 0;
×
695
}
696

697
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
670,555✔
698
#ifdef USE_REPORT
699
  if (!tsEnableCrashReport) {
670,555✔
700
    return;
670,555✔
701
  }
702

703
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
704
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
705
    taosThreadClear(&pMgmt->crashReportThread);
×
706
  }
707
#endif
708
}
709

710
void dmStopMetricsThread(SDnodeMgmt *pMgmt) {
670,555✔
711
  if (taosCheckPthreadValid(pMgmt->metricsThread)) {
670,555✔
712
    (void)taosThreadJoin(pMgmt->metricsThread, NULL);
670,555✔
713
    taosThreadClear(&pMgmt->metricsThread);
670,555✔
714
  }
715
}
670,555✔
716

717
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
87,904,893✔
718
  SDnodeMgmt *pMgmt = pInfo->ahandle;
87,904,893✔
719
  int32_t     code = -1;
87,904,893✔
720
  STraceId   *trace = &pMsg->info.traceId;
87,904,893✔
721
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
87,904,893✔
722

723
  switch (pMsg->msgType) {
87,904,893✔
724
    case TDMT_DND_CONFIG_DNODE:
98,232✔
725
      code = dmProcessConfigReq(pMgmt, pMsg);
98,232✔
726
      break;
98,232✔
727
    case TDMT_MND_AUTH_RSP:
×
728
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
729
      break;
×
730
    case TDMT_MND_GRANT_RSP:
×
731
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
732
      break;
×
733
    case TDMT_DND_CREATE_MNODE:
16,115✔
734
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
16,115✔
735
      break;
16,115✔
736
    case TDMT_DND_DROP_MNODE:
849✔
737
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
849✔
738
      break;
849✔
739
    case TDMT_DND_CREATE_QNODE:
5,320✔
740
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
5,320✔
741
      break;
5,320✔
742
    case TDMT_DND_DROP_QNODE:
924✔
743
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
924✔
744
      break;
924✔
745
    case TDMT_DND_CREATE_SNODE:
73,937✔
746
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
73,937✔
747
      break;
73,937✔
748
    case TDMT_DND_ALTER_SNODE:
119,340✔
749
      code = (*pMgmt->processAlterNodeFp)(SNODE, pMsg);
119,340✔
750
      break;
119,340✔
751
    case TDMT_DND_DROP_SNODE:
42,016✔
752
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
42,016✔
753
      break;
42,016✔
754
    case TDMT_DND_CREATE_BNODE:
24,363✔
755
      code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg);
24,363✔
756
      break;
24,363✔
757
    case TDMT_DND_DROP_BNODE:
24,355✔
758
      code = (*pMgmt->processDropNodeFp)(BNODE, pMsg);
24,355✔
759
      break;
24,355✔
760
    case TDMT_DND_ALTER_MNODE_TYPE:
190,502✔
761
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
190,502✔
762
      break;
190,502✔
763
    case TDMT_DND_SERVER_STATUS:
178✔
764
      code = dmProcessServerRunStatus(pMgmt, pMsg);
178✔
765
      break;
178✔
766
    case TDMT_DND_SYSTABLE_RETRIEVE:
27,868✔
767
      code = dmProcessRetrieve(pMgmt, pMsg);
27,868✔
768
      break;
27,868✔
769
    case TDMT_MND_GRANT:
852,353✔
770
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
852,353✔
771
      break;
852,353✔
772
    case TDMT_MND_GRANT_NOTIFY:
86,428,055✔
773
      code = dmProcessGrantNotify(NULL, pMsg);
86,428,055✔
774
      break;
86,428,055✔
775
    case TDMT_DND_CREATE_ENCRYPT_KEY:
190✔
776
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
190✔
777
      break;
190✔
778
    case TDMT_MND_ALTER_ENCRYPT_KEY:
296✔
779
      code = dmProcessAlterEncryptKeyReq(pMgmt, pMsg);
296✔
780
      break;
296✔
781
    case TDMT_MND_ALTER_KEY_EXPIRATION:
×
782
      code = dmProcessAlterKeyExpirationReq(pMgmt, pMsg);
×
783
      break;
×
784
    case TDMT_DND_RELOAD_DNODE_TLS:
×
785
      code = dmProcessReloadTlsConfig(pMgmt, pMsg);
×
786
      // code = dmProcessReloadEncryptKeyReq(pMgmt, pMsg);
787
      break;
×
788
    default:
×
789

790
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
791
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
792
      break;
×
793
  }
794

795
  if (IsReq(pMsg)) {
87,904,893✔
796
    if (code != 0 && terrno != 0) code = terrno;
87,904,893✔
797
    SRpcMsg rsp = {
175,802,281✔
798
        .code = code,
799
        .pCont = pMsg->info.rsp,
87,904,893✔
800
        .contLen = pMsg->info.rspLen,
87,904,893✔
801
        .info = pMsg->info,
802
    };
803

804
    code = rpcSendResponse(&rsp);
87,904,893✔
805
    if (code != 0) {
87,904,893✔
806
      dError("failed to send response since %s", tstrerror(code));
×
807
    }
808
  }
809

810
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
87,904,893✔
811
  rpcFreeCont(pMsg->pCont);
87,904,893✔
812
  taosFreeQitem(pMsg);
87,904,893✔
813
}
87,904,893✔
814

815
int32_t dmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
18,530,945✔
816
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
18,530,945✔
817
  if (pMsg->code) {
18,530,945✔
818
    *pWorkerIdx = 0;
183,307✔
819
    return TSDB_CODE_SUCCESS;
183,307✔
820
  }
821
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
18,347,638✔
822
  *pWorkerIdx = pHeader->streamGid % tsNumOfStreamMgmtThreads;
18,347,638✔
823
  return TSDB_CODE_SUCCESS;
18,347,638✔
824
}
825

826

827
static void dmProcessStreamMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
18,530,945✔
828
  SDnodeMgmt *pMgmt = pInfo->ahandle;
18,530,945✔
829
  int32_t     code = -1;
18,530,945✔
830
  STraceId   *trace = &pMsg->info.traceId;
18,530,945✔
831
  dGTrace("msg:%p, will be processed in dnode stream mgmt queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
18,530,945✔
832

833
  switch (pMsg->msgType) {
18,530,945✔
834
    case TDMT_MND_STREAM_HEARTBEAT_RSP:
18,530,945✔
835
      code = dmProcessStreamHbRsp(pMgmt, pMsg);
18,530,945✔
836
      break;
18,530,945✔
837
    default:
×
838
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
839
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
840
      break;
×
841
  }
842

843
  if (IsReq(pMsg)) {
18,530,945✔
844
    if (code != 0 && terrno != 0) code = terrno;
×
845
    SRpcMsg rsp = {
×
846
        .code = code,
847
        .pCont = pMsg->info.rsp,
×
848
        .contLen = pMsg->info.rspLen,
×
849
        .info = pMsg->info,
850
    };
851

852
    code = rpcSendResponse(&rsp);
×
853
    if (code != 0) {
×
854
      dError("failed to send response since %s", tstrerror(code));
×
855
    }
856
  }
857

858
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
18,530,945✔
859
  rpcFreeCont(pMsg->pCont);
18,530,945✔
860
  taosFreeQitem(pMsg);
18,530,945✔
861
}
18,530,945✔
862

863

864
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
670,627✔
865
  int32_t          code = 0;
670,627✔
866
  SSingleWorkerCfg cfg = {
670,627✔
867
      .min = 1,
868
      .max = 1,
869
      .name = "dnode-mgmt",
870
      .fp = (FItem)dmProcessMgmtQueue,
871
      .param = pMgmt,
872
  };
873
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
670,627✔
874
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
875
    return code;
×
876
  }
877

878
  SDispatchWorkerPool* pStMgmtpool = &pMgmt->streamMgmtWorker;
670,627✔
879
  pStMgmtpool->max = tsNumOfStreamMgmtThreads;
670,627✔
880
  pStMgmtpool->name = "dnode-stream-mgmt";
670,627✔
881
  code = tDispatchWorkerInit(pStMgmtpool);
670,627✔
882
  if (code != 0) {
670,627✔
883
    dError("failed to start dnode-stream-mgmt worker since %s", tstrerror(code));
×
884
    return code;
×
885
  }
886
  code = tDispatchWorkerAllocQueue(pStMgmtpool, pMgmt, (FItem)dmProcessStreamMgmtQueue, dmDispatchStreamHbMsg);
670,627✔
887
  if (code != 0) {
670,627✔
888
    dError("failed to allocate dnode-stream-mgmt worker queue since %s", tstrerror(code));
×
889
    return code;
×
890
  }
891

892
  dDebug("dnode workers are initialized");
670,627✔
893
  return 0;
670,627✔
894
}
895

896
void dmStopWorker(SDnodeMgmt *pMgmt) {
670,627✔
897
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
670,627✔
898
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorker);
670,627✔
899
  dDebug("dnode workers are closed");
670,627✔
900
}
670,627✔
901

902
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
87,904,816✔
903
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
87,904,816✔
904
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
87,904,893✔
905
  return taosWriteQitem(pWorker->queue, pMsg);
87,904,893✔
906
}
907

908
int32_t dmPutMsgToStreamMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
18,530,945✔
909
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorker, pMsg);
18,530,945✔
910
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc