• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.93
/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmInt.h"
18
#include "tgrant.h"
19
#include "thttp.h"
20
#include "streamMsg.h"
21
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
22
#include "taoskInt.h"
23
#endif
24

25
// Encryption key expiration constants
26
#define MILLISECONDS_PER_DAY (24 * 3600 * 1000)
27

28
static void *dmStatusThreadFp(void *param) {
556,134✔
29
  SDnodeMgmt *pMgmt = param;
556,134✔
30
  int64_t     lastTime = taosGetTimestampMs();
556,134✔
31
  setThreadName("dnode-status");
556,134✔
32

33
  while (1) {
825,341,693✔
34
    taosMsleep(50);
825,897,827✔
35
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
825,897,827✔
36

37
    int64_t curTime = taosGetTimestampMs();
825,341,693✔
38
    if (curTime < lastTime) lastTime = curTime;
825,341,693✔
39
    float interval = curTime - lastTime;
825,341,693✔
40
    if (interval >= tsStatusIntervalMs) {
825,341,693✔
41
      dmSendStatusReq(pMgmt);
41,297,144✔
42
      lastTime = curTime;
41,297,144✔
43
    }
44
  }
45

46
  return NULL;
556,134✔
47
}
48

49
static void *dmConfigThreadFp(void *param) {
556,134✔
50
  SDnodeMgmt *pMgmt = param;
556,134✔
51
  int64_t     lastTime = taosGetTimestampMs();
556,134✔
52
  setThreadName("dnode-config");
556,134✔
53
  while (1) {
11,098,640✔
54
    taosMsleep(50);
11,654,774✔
55
    if (pMgmt->pData->dropped || pMgmt->pData->stopped || tsConfigInited) break;
11,654,774✔
56

57
    int64_t curTime = taosGetTimestampMs();
11,098,640✔
58
    if (curTime < lastTime) lastTime = curTime;
11,098,640✔
59
    float interval = curTime - lastTime;
11,098,640✔
60
    if (interval >= tsStatusIntervalMs) {
11,098,640✔
61
      dmSendConfigReq(pMgmt);
558,011✔
62
      lastTime = curTime;
558,011✔
63
    }
64
  }
65
  return NULL;
556,134✔
66
}
67

68
static void *dmKeySyncThreadFp(void *param) {
556,134✔
69
  SDnodeMgmt *pMgmt = param;
1,112,063✔
70
  int64_t     lastTime = taosGetTimestampMs();
556,134✔
71
  setThreadName("dnode-keysync");
556,134✔
72

73
  // Wait a bit before first sync attempt
74
  taosMsleep(3000);
556,134✔
75

76
  while (1) {
5,223,616✔
77
    taosMsleep(100);
5,779,750✔
78
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
5,779,750✔
79

80
    int64_t curTime = taosGetTimestampMs();
5,724,069✔
81
    if (curTime < lastTime) lastTime = curTime;
5,724,069✔
82
    float interval = curTime - lastTime;
5,724,069✔
83
    if (interval >= tsStatusIntervalMs) {
5,724,069✔
84
      // Sync keys periodically (every 30 seconds) or on first run
85
      if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_LOADED) {
1,076,113✔
86
        // Check if encryption keys are expired based on configured threshold
87
        int64_t keyExpirationThreshold = (int64_t)tsKeyExpirationDays * MILLISECONDS_PER_DAY;
38,148✔
88
        int64_t svrKeyAge = curTime - tsSvrKeyUpdateTime;
38,148✔
89
        int64_t dbKeyAge = curTime - tsDbKeyUpdateTime;
38,148✔
90

91
        if (svrKeyAge > keyExpirationThreshold || dbKeyAge > keyExpirationThreshold) {
38,148✔
NEW
92
          const char *action = (strcmp(tsKeyExpirationStrategy, "ALARM") == 0) ? "warning" : "attempting reload";
×
NEW
93
          dWarn("encryption keys may be expired (threshold:%d days, strategy:%s), svrKeyAge:%" PRId64
×
94
                " days, dbKeyAge:%" PRId64 " days, %s",
95
                tsKeyExpirationDays, tsKeyExpirationStrategy, svrKeyAge / MILLISECONDS_PER_DAY,
96
                dbKeyAge / MILLISECONDS_PER_DAY, action);
97
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
98
          // Try to reload keys from file
UNCOV
99
          char masterKeyFile[PATH_MAX] = {0};
×
100
          char derivedKeyFile[PATH_MAX] = {0};
×
101
          snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP,
×
102
                   TD_DIRSEP, TD_DIRSEP);
UNCOV
103
          snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP,
×
104
                   TD_DIRSEP, TD_DIRSEP);
105

UNCOV
106
          char    svrKey[ENCRYPT_KEY_LEN + 1] = {0};
×
107
          char    dbKey[ENCRYPT_KEY_LEN + 1] = {0};
×
108
          char    cfgKey[ENCRYPT_KEY_LEN + 1] = {0};
×
109
          char    metaKey[ENCRYPT_KEY_LEN + 1] = {0};
×
110
          char    dataKey[ENCRYPT_KEY_LEN + 1] = {0};
×
111
          int32_t algorithm = 0;
×
112
          int32_t cfgAlgorithm = 0;
×
113
          int32_t metaAlgorithm = 0;
×
114
          int32_t fileVersion = 0;
×
115
          int32_t keyVersion = 0;
×
116
          int64_t createTime = 0;
×
117
          int64_t svrKeyUpdateTime = 0;
×
118
          int64_t dbKeyUpdateTime = 0;
×
119

120
          int32_t code =
UNCOV
121
              taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
×
122
                                   &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
123
                                   &svrKeyUpdateTime, &dbKeyUpdateTime);
UNCOV
124
          if (code == 0) {
×
125
            // Update global variables with reloaded keys
UNCOV
126
            tstrncpy(tsSvrKey, svrKey, sizeof(tsSvrKey));
×
127
            tstrncpy(tsDbKey, dbKey, sizeof(tsDbKey));
×
128
            tstrncpy(tsCfgKey, cfgKey, sizeof(tsCfgKey));
×
129
            tstrncpy(tsMetaKey, metaKey, sizeof(tsMetaKey));
×
130
            tstrncpy(tsDataKey, dataKey, sizeof(tsDataKey));
×
131
            tsEncryptAlgorithmType = algorithm;
×
132
            tsCfgAlgorithm = cfgAlgorithm;
×
133
            tsMetaAlgorithm = metaAlgorithm;
×
134
            tsEncryptFileVersion = fileVersion;
×
135
            tsEncryptKeyVersion = keyVersion;
×
136
            tsEncryptKeyCreateTime = createTime;
×
137
            tsSvrKeyUpdateTime = svrKeyUpdateTime;
×
138
            tsDbKeyUpdateTime = dbKeyUpdateTime;
×
139

140
            // Check if keys are still expired after reload
UNCOV
141
            svrKeyAge = curTime - tsSvrKeyUpdateTime;
×
142
            dbKeyAge = curTime - tsDbKeyUpdateTime;
×
NEW
143
            if (svrKeyAge > keyExpirationThreshold || dbKeyAge > keyExpirationThreshold) {
×
NEW
144
              dError("encryption keys are still expired after reload (threshold:%d days), svrKeyAge:%" PRId64
×
145
                     " days, dbKeyAge:%" PRId64 " days, please rotate keys",
146
                     tsKeyExpirationDays, svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY);
147
            } else {
NEW
148
              dInfo("successfully reloaded encryption keys, svrKeyAge:%" PRId64 " days, dbKeyAge:%" PRId64
×
149
                    " days (threshold:%d days)",
150
                    svrKeyAge / MILLISECONDS_PER_DAY, dbKeyAge / MILLISECONDS_PER_DAY, tsKeyExpirationDays);
151
            }
152
          } else {
UNCOV
153
            dError("failed to reload encryption keys since %s", tstrerror(code));
×
154
          }
155
#endif
156
        }
157
      } else if (tsEncryptKeysStatus == TSDB_ENCRYPT_KEY_STAT_DISABLED) {
1,037,965✔
158
        dInfo("encryption keys are disabled, stopping key sync thread");
500,453✔
159
        break;
500,453✔
160
      } else {
161
        dmSendKeySyncReq(pMgmt);
537,512✔
162
      }
163
      lastTime = curTime;
575,660✔
164
    }
165
  }
166
  return NULL;
556,134✔
167
}
168

169
static void *dmStatusInfoThreadFp(void *param) {
556,134✔
170
  SDnodeMgmt *pMgmt = param;
556,134✔
171
  int64_t     lastTime = taosGetTimestampMs();
556,134✔
172
  setThreadName("dnode-status-info");
556,134✔
173

174
  int32_t upTimeCount = 0;
556,134✔
175
  int64_t upTime = 0;
556,134✔
176

177
  while (1) {
838,975,129✔
178
    taosMsleep(50);
839,531,263✔
179
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
839,531,263✔
180

181
    int64_t curTime = taosGetTimestampMs();
838,975,129✔
182
    if (curTime < lastTime) lastTime = curTime;
838,975,129✔
183
    float interval = curTime - lastTime;
838,975,129✔
184
    if (interval >= tsStatusIntervalMs) {
838,975,129✔
185
      dmUpdateStatusInfo(pMgmt);
41,805,677✔
186
      lastTime = curTime;
41,805,677✔
187

188
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
41,805,677✔
189
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
467,633✔
190
        if (upTime > 0) tsDndUpTime = upTime;
467,633✔
191
      }
192
    }
193
  }
194

195
  return NULL;
556,134✔
196
}
197

198
#if defined(TD_ENTERPRISE)
199
SDmNotifyHandle dmNotifyHdl = {.state = 0};
200
#define TIMESERIES_STASH_NUM 5
201
static void *dmNotifyThreadFp(void *param) {
556,134✔
202
  SDnodeMgmt *pMgmt = param;
556,134✔
203
  int64_t     lastTime = taosGetTimestampMs();
556,134✔
204
  setThreadName("dnode-notify");
556,134✔
205

206
  if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
556,134✔
UNCOV
207
    return NULL;
×
208
  }
209

210
  // calculate approximate timeSeries per second
211
  int64_t  notifyTimeStamp[TIMESERIES_STASH_NUM];
555,929✔
212
  int64_t  notifyTimeSeries[TIMESERIES_STASH_NUM];
555,929✔
213
  int64_t  approximateTimeSeries = 0;
556,134✔
214
  uint64_t nTotalNotify = 0;
556,134✔
215
  int32_t  head, tail = 0;
556,134✔
216

217
  bool       wait = true;
556,134✔
218
  int32_t    nDnode = 0;
556,134✔
219
  int64_t    lastNotify = 0;
556,134✔
220
  int64_t    lastFetchDnode = 0;
556,134✔
221
  SNotifyReq req = {0};
556,134✔
222
  while (1) {
46,271,347✔
223
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
46,827,481✔
224
    if (wait) tsem_wait(&dmNotifyHdl.sem);
46,271,347✔
225
    atomic_store_8(&dmNotifyHdl.state, 1);
46,271,347✔
226

227
    int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
46,271,347✔
228
    if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
46,271,347✔
229
      goto _skip;
46,271,347✔
230
    }
UNCOV
231
    int64_t current = taosGetTimestampMs();
×
UNCOV
232
    if (current - lastFetchDnode > 1000) {
×
233
      nDnode = dmGetDnodeSize(pMgmt->pData);
×
234
      if (nDnode < 1) nDnode = 1;
×
235
      lastFetchDnode = current;
×
236
    }
237
    if (req.dnodeId == 0 || req.clusterId == 0) {
×
UNCOV
238
      req.dnodeId = pMgmt->pData->dnodeId;
×
239
      req.clusterId = pMgmt->pData->clusterId;
×
240
    }
241

UNCOV
242
    if (current - lastNotify < 10) {
×
UNCOV
243
      int64_t nCmprTimeSeries = approximateTimeSeries / 100;
×
244
      if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
×
245
      if (remainTimeSeries > nCmprTimeSeries * 10) {
×
246
        taosMsleep(10);
×
247
      } else if (remainTimeSeries > nCmprTimeSeries * 5) {
×
248
        taosMsleep(5);
×
249
      } else {
250
        taosMsleep(2);
×
251
      }
252
    }
253

UNCOV
254
    SMonVloadInfo vinfo = {0};
×
UNCOV
255
    (*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
×
256
    req.pVloads = vinfo.pVloads;
×
257
    int32_t nVgroup = taosArrayGetSize(req.pVloads);
×
258
    int64_t nTimeSeries = 0;
×
259
    for (int32_t i = 0; i < nVgroup; ++i) {
×
260
      SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
×
261
      nTimeSeries += vload->nTimeSeries;
×
262
    }
263
    notifyTimeSeries[tail] = nTimeSeries;
×
UNCOV
264
    notifyTimeStamp[tail] = taosGetTimestampNs();
×
265
    ++nTotalNotify;
×
266

267
    approximateTimeSeries = 0;
×
UNCOV
268
    if (nTotalNotify >= TIMESERIES_STASH_NUM) {
×
269
      head = tail - TIMESERIES_STASH_NUM + 1;
×
270
      if (head < 0) head += TIMESERIES_STASH_NUM;
×
271
      int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
×
272
      int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
×
273
      if (tsDiff > 0) {
×
274
        if (timeDiff > 0 && timeDiff < 1e9) {
×
275
          approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
×
276
          if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
×
277
            dmSendNotifyReq(pMgmt, &req);
×
278
          }
279
        } else {
UNCOV
280
          dmSendNotifyReq(pMgmt, &req);
×
281
        }
282
      }
283
    } else {
UNCOV
284
      dmSendNotifyReq(pMgmt, &req);
×
285
    }
286
    if (++tail == TIMESERIES_STASH_NUM) tail = 0;
×
287

288
    tFreeSNotifyReq(&req);
×
UNCOV
289
    lastNotify = taosGetTimestampMs();
×
290
  _skip:
46,271,347✔
291
    if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
46,271,347✔
292
      wait = true;
46,265,090✔
293
      continue;
46,265,090✔
294
    }
295
    wait = false;
6,257✔
296
  }
297

298
  return NULL;
556,134✔
299
}
300
#endif
301

302
#ifdef USE_MONITOR
303
static void *dmMonitorThreadFp(void *param) {
556,134✔
304
  SDnodeMgmt *pMgmt = param;
556,134✔
305
  int64_t     lastTime = taosGetTimestampMs();
556,134✔
306
  int64_t     lastTimeForBasic = taosGetTimestampMs();
556,134✔
307
  setThreadName("dnode-monitor");
556,134✔
308

309
  static int32_t TRIM_FREQ = 20;
310
  int32_t        trimCount = 0;
556,134✔
311

312
  while (1) {
211,385,143✔
313
    taosMsleep(200);
211,941,277✔
314
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
211,941,277✔
315

316
    int64_t curTime = taosGetTimestampMs();
211,385,143✔
317

318
    if (curTime < lastTime) lastTime = curTime;
211,385,143✔
319
    float interval = (curTime - lastTime) / 1000.0f;
211,385,143✔
320
    if (interval >= tsMonitorInterval) {
211,385,143✔
321
      (*pMgmt->sendMonitorReportFp)();
1,172,753✔
322
      (*pMgmt->monitorCleanExpiredSamplesFp)();
1,172,753✔
323
      lastTime = curTime;
1,172,753✔
324

325
      trimCount = (trimCount + 1) % TRIM_FREQ;
1,172,753✔
326
      if (trimCount == 0) {
1,172,753✔
327
        taosMemoryTrim(0, NULL);
2,292✔
328
      }
329
    }
330
    if (atomic_val_compare_exchange_8(&tsNeedTrim, 1, 0)) {
211,385,143✔
331
      taosMemoryTrim(0, NULL);
7,863,945✔
332
    }
333
  }
334

335
  return NULL;
556,134✔
336
}
337
#endif
338
#ifdef USE_AUDIT
339
static void *dmAuditThreadFp(void *param) {
556,134✔
340
  SDnodeMgmt *pMgmt = param;
556,134✔
341
  int64_t     lastTime = taosGetTimestampMs();
556,134✔
342
  setThreadName("dnode-audit");
556,134✔
343

344
  while (1) {
422,140,547✔
345
    taosMsleep(100);
422,696,681✔
346
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
422,696,681✔
347

348
    int64_t curTime = taosGetTimestampMs();
422,140,547✔
349
    if (curTime < lastTime) lastTime = curTime;
422,140,547✔
350
    float interval = curTime - lastTime;
422,140,547✔
351
    if (interval >= tsAuditInterval) {
422,140,547✔
352
      (*pMgmt->sendAuditRecordsFp)();
8,164,623✔
353
      lastTime = curTime;
8,164,623✔
354
    }
355
  }
356

357
  return NULL;
556,134✔
358
}
359
#endif
360
#ifdef USE_REPORT
UNCOV
361
static void *dmCrashReportThreadFp(void *param) {
×
UNCOV
362
  int32_t     code = 0;
×
363
  SDnodeMgmt *pMgmt = param;
×
364
  int64_t     lastTime = taosGetTimestampMs();
×
365
  setThreadName("dnode-crashReport");
×
366
  char filepath[PATH_MAX] = {0};
×
367
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
×
368
  char     *pMsg = NULL;
×
369
  int64_t   msgLen = 0;
×
370
  TdFilePtr pFile = NULL;
×
371
  bool      truncateFile = false;
×
372
  int32_t   sleepTime = 200;
×
373
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
374
  int32_t   loopTimes = reportPeriodNum;
×
375

376
  STelemAddrMgmt mgt = {0};
×
UNCOV
377
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
378
  if (code != 0) {
×
379
    dError("failed to init telemetry since %s", tstrerror(code));
×
380
    return NULL;
×
381
  }
382
  code = initCrashLogWriter();
×
UNCOV
383
  if (code != 0) {
×
384
    dError("failed to init crash log writer since %s", tstrerror(code));
×
385
    return NULL;
×
386
  }
387

388
  while (1) {
UNCOV
389
    checkAndPrepareCrashInfo();
×
UNCOV
390
    if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) {
×
391
      break;
×
392
    }
393
    if (loopTimes++ < reportPeriodNum) {
×
UNCOV
394
      taosMsleep(sleepTime);
×
395
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
396
      continue;
×
397
    }
398
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
UNCOV
399
    if (pMsg && msgLen > 0) {
×
400
      if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
401
        dError("failed to send crash report");
×
402
        if (pFile) {
×
403
          taosReleaseCrashLogFile(pFile, false);
×
404
          pFile = NULL;
×
405

406
          taosMsleep(sleepTime);
×
UNCOV
407
          loopTimes = 0;
×
408
          continue;
×
409
        }
410
      } else {
UNCOV
411
        dInfo("succeed to send crash report");
×
UNCOV
412
        truncateFile = true;
×
413
      }
414
    } else {
UNCOV
415
      dInfo("no crash info was found");
×
416
    }
417

UNCOV
418
    taosMemoryFree(pMsg);
×
419

420
    if (pMsg && msgLen > 0) {
×
UNCOV
421
      pMsg = NULL;
×
422
      continue;
×
423
    }
424

UNCOV
425
    if (pFile) {
×
UNCOV
426
      taosReleaseCrashLogFile(pFile, truncateFile);
×
427
      pFile = NULL;
×
428
      truncateFile = false;
×
429
    }
430

UNCOV
431
    taosMsleep(sleepTime);
×
UNCOV
432
    loopTimes = 0;
×
433
  }
434
  taosTelemetryDestroy(&mgt);
×
435

436
  return NULL;
×
437
}
438
#endif
439

440
static void *dmMetricsThreadFp(void *param) {
556,134✔
441
  SDnodeMgmt *pMgmt = param;
556,134✔
442
  int64_t     lastTime = taosGetTimestampMs();
556,134✔
443
  setThreadName("dnode-metrics");
556,134✔
444
  while (1) {
211,381,315✔
445
    taosMsleep(200);
211,937,449✔
446
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
211,937,449✔
447

448
    int64_t curTime = taosGetTimestampMs();
211,381,315✔
449
    if (curTime < lastTime) lastTime = curTime;
211,381,315✔
450
    float interval = (curTime - lastTime) / 1000.0f;
211,381,315✔
451
    if (interval >= tsMetricsInterval) {
211,381,315✔
452
      (*pMgmt->sendMetricsReportFp)();
1,172,573✔
453
      (*pMgmt->metricsCleanExpiredSamplesFp)();
1,172,573✔
454
      lastTime = curTime;
1,172,573✔
455
    }
456
  }
457
  return NULL;
556,134✔
458
}
459

460
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
556,134✔
461
  int32_t      code = 0;
556,134✔
462
  TdThreadAttr thAttr;
555,929✔
463
  (void)taosThreadAttrInit(&thAttr);
556,134✔
464
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,134✔
465
#ifdef TD_COMPACT_OS
466
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
467
#endif
468
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
556,134✔
UNCOV
469
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
470
    dError("failed to create status thread since %s", tstrerror(code));
×
471
    return code;
×
472
  }
473

474
  (void)taosThreadAttrDestroy(&thAttr);
556,134✔
475
  tmsgReportStartup("dnode-status", "initialized");
556,134✔
476
  return 0;
556,134✔
477
}
478

479
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
556,134✔
480
  int32_t      code = 0;
556,134✔
481
  TdThreadAttr thAttr;
555,929✔
482
  (void)taosThreadAttrInit(&thAttr);
556,134✔
483
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,134✔
484
#ifdef TD_COMPACT_OS
485
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
486
#endif
487
  if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
556,134✔
UNCOV
488
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
489
    dError("failed to create config thread since %s", tstrerror(code));
×
490
    return code;
×
491
  }
492

493
  (void)taosThreadAttrDestroy(&thAttr);
556,134✔
494
  tmsgReportStartup("config-status", "initialized");
556,134✔
495
  return 0;
556,134✔
496
}
497

498
int32_t dmStartKeySyncThread(SDnodeMgmt *pMgmt) {
556,134✔
499
  int32_t      code = 0;
556,134✔
500
  TdThreadAttr thAttr;
555,929✔
501
  (void)taosThreadAttrInit(&thAttr);
556,134✔
502
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,134✔
503
#ifdef TD_COMPACT_OS
504
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
505
#endif
506
  if (taosThreadCreate(&pMgmt->keySyncThread, &thAttr, dmKeySyncThreadFp, pMgmt) != 0) {
556,134✔
UNCOV
507
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
508
    dError("failed to create key sync thread since %s", tstrerror(code));
×
509
    return code;
×
510
  }
511

512
  (void)taosThreadAttrDestroy(&thAttr);
556,134✔
513
  tmsgReportStartup("dnode-keysync", "initialized");
556,134✔
514
  return 0;
556,134✔
515
}
516

517
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
556,134✔
518
  int32_t      code = 0;
556,134✔
519
  TdThreadAttr thAttr;
555,929✔
520
  (void)taosThreadAttrInit(&thAttr);
556,134✔
521
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,134✔
522
#ifdef TD_COMPACT_OS
523
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
524
#endif
525
  if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
556,134✔
UNCOV
526
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
527
    dError("failed to create status Info thread since %s", tstrerror(code));
×
528
    return code;
×
529
  }
530

531
  (void)taosThreadAttrDestroy(&thAttr);
556,134✔
532
  tmsgReportStartup("dnode-status-info", "initialized");
556,134✔
533
  return 0;
556,134✔
534
}
535

536
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
556,134✔
537
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
556,134✔
538
    (void)taosThreadJoin(pMgmt->statusThread, NULL);
556,134✔
539
    taosThreadClear(&pMgmt->statusThread);
556,134✔
540
  }
541
}
556,134✔
542

543
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
556,134✔
544
  if (taosCheckPthreadValid(pMgmt->configThread)) {
556,134✔
545
    (void)taosThreadJoin(pMgmt->configThread, NULL);
556,134✔
546
    taosThreadClear(&pMgmt->configThread);
556,134✔
547
  }
548
}
556,134✔
549

550
void dmStopKeySyncThread(SDnodeMgmt *pMgmt) {
556,134✔
551
  if (taosCheckPthreadValid(pMgmt->keySyncThread)) {
556,134✔
552
    (void)taosThreadJoin(pMgmt->keySyncThread, NULL);
556,134✔
553
    taosThreadClear(&pMgmt->keySyncThread);
556,134✔
554
  }
555
}
556,134✔
556

557
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
556,134✔
558
  if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
556,134✔
559
    (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
556,134✔
560
    taosThreadClear(&pMgmt->statusInfoThread);
556,134✔
561
  }
562
}
556,134✔
563
#ifdef TD_ENTERPRISE
564
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
556,134✔
565
  int32_t      code = 0;
556,134✔
566
  TdThreadAttr thAttr;
555,929✔
567
  (void)taosThreadAttrInit(&thAttr);
556,134✔
568
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,134✔
569
  if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
556,134✔
UNCOV
570
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
571
    dError("failed to create notify thread since %s", tstrerror(code));
×
572
    return code;
×
573
  }
574

575
  (void)taosThreadAttrDestroy(&thAttr);
556,134✔
576
  tmsgReportStartup("dnode-notify", "initialized");
556,134✔
577
  return 0;
556,134✔
578
}
579

580
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
556,134✔
581
  if (taosCheckPthreadValid(pMgmt->notifyThread)) {
556,134✔
582
    if (tsem_post(&dmNotifyHdl.sem) != 0) {
556,134✔
UNCOV
583
      dError("failed to post notify sem");
×
584
    }
585

586
    (void)taosThreadJoin(pMgmt->notifyThread, NULL);
556,134✔
587
    taosThreadClear(&pMgmt->notifyThread);
556,134✔
588
  }
589
  if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
556,134✔
UNCOV
590
    dError("failed to destroy notify sem");
×
591
  }
592
}
556,134✔
593
#endif
594
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
556,134✔
595
  int32_t      code = 0;
556,134✔
596
#ifdef USE_MONITOR
597
  TdThreadAttr thAttr;
555,929✔
598
  (void)taosThreadAttrInit(&thAttr);
556,134✔
599
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,134✔
600
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
556,134✔
UNCOV
601
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
602
    dError("failed to create monitor thread since %s", tstrerror(code));
×
603
    return code;
×
604
  }
605

606
  (void)taosThreadAttrDestroy(&thAttr);
556,134✔
607
  tmsgReportStartup("dnode-monitor", "initialized");
556,134✔
608
#endif
609
  return 0;
556,134✔
610
}
611

612
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
556,134✔
613
  int32_t      code = 0;
556,134✔
614
#ifdef USE_AUDIT  
615
  TdThreadAttr thAttr;
555,929✔
616
  (void)taosThreadAttrInit(&thAttr);
556,134✔
617
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,134✔
618
  if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
556,134✔
UNCOV
619
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
620
    dError("failed to create audit thread since %s", tstrerror(code));
×
621
    return code;
×
622
  }
623

624
  (void)taosThreadAttrDestroy(&thAttr);
556,134✔
625
  tmsgReportStartup("dnode-audit", "initialized");
556,134✔
626
#endif  
627
  return 0;
556,134✔
628
}
629

630
int32_t dmStartMetricsThread(SDnodeMgmt *pMgmt) {
556,134✔
631
  int32_t code = 0;
556,134✔
632
#ifdef USE_MONITOR
633
  TdThreadAttr thAttr;
555,929✔
634
  (void)taosThreadAttrInit(&thAttr);
556,134✔
635
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
556,134✔
636
  if (taosThreadCreate(&pMgmt->metricsThread, &thAttr, dmMetricsThreadFp, pMgmt) != 0) {
556,134✔
UNCOV
637
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
638
    dError("failed to create metrics thread since %s", tstrerror(code));
×
639
    return code;
×
640
  }
641

642
  (void)taosThreadAttrDestroy(&thAttr);
556,134✔
643
  tmsgReportStartup("dnode-metrics", "initialized");
556,134✔
644
#endif
645
  return 0;
556,134✔
646
}
647

648
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
556,134✔
649
#ifdef USE_MONITOR
650
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
556,134✔
651
    (void)taosThreadJoin(pMgmt->monitorThread, NULL);
556,134✔
652
    taosThreadClear(&pMgmt->monitorThread);
556,134✔
653
  }
654
#endif
655
}
556,134✔
656

657
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
556,134✔
658
#ifdef USE_AUDIT
659
  if (taosCheckPthreadValid(pMgmt->auditThread)) {
556,134✔
660
    (void)taosThreadJoin(pMgmt->auditThread, NULL);
556,134✔
661
    taosThreadClear(&pMgmt->auditThread);
556,134✔
662
  }
663
#endif
664
}
556,134✔
665

666
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
556,134✔
667
  int32_t code = 0;
556,134✔
668
#ifdef USE_REPORT
669
  if (!tsEnableCrashReport) {
556,134✔
670
    return 0;
556,134✔
671
  }
672

UNCOV
673
  TdThreadAttr thAttr;
×
UNCOV
674
  (void)taosThreadAttrInit(&thAttr);
×
675
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
676
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
×
677
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
678
    dError("failed to create crashReport thread since %s", tstrerror(code));
×
679
    return code;
×
680
  }
681

UNCOV
682
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
683
  tmsgReportStartup("dnode-crashReport", "initialized");
×
684
#endif
685
  return 0;
×
686
}
687

688
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
556,134✔
689
#ifdef USE_REPORT
690
  if (!tsEnableCrashReport) {
556,134✔
691
    return;
556,134✔
692
  }
693

UNCOV
694
  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
×
UNCOV
695
    (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
×
696
    taosThreadClear(&pMgmt->crashReportThread);
×
697
  }
698
#endif
699
}
700

701
void dmStopMetricsThread(SDnodeMgmt *pMgmt) {
556,134✔
702
  if (taosCheckPthreadValid(pMgmt->metricsThread)) {
556,134✔
703
    (void)taosThreadJoin(pMgmt->metricsThread, NULL);
556,134✔
704
    taosThreadClear(&pMgmt->metricsThread);
556,134✔
705
  }
706
}
556,134✔
707

708
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
72,126,704✔
709
  SDnodeMgmt *pMgmt = pInfo->ahandle;
72,126,704✔
710
  int32_t     code = -1;
72,126,704✔
711
  STraceId   *trace = &pMsg->info.traceId;
72,126,704✔
712
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
72,126,704✔
713

714
  switch (pMsg->msgType) {
72,126,704✔
715
    case TDMT_DND_CONFIG_DNODE:
73,180✔
716
      code = dmProcessConfigReq(pMgmt, pMsg);
73,180✔
717
      break;
73,180✔
UNCOV
718
    case TDMT_MND_AUTH_RSP:
×
UNCOV
719
      code = dmProcessAuthRsp(pMgmt, pMsg);
×
720
      break;
×
721
    case TDMT_MND_GRANT_RSP:
×
722
      code = dmProcessGrantRsp(pMgmt, pMsg);
×
723
      break;
×
724
    case TDMT_DND_CREATE_MNODE:
13,972✔
725
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
13,972✔
726
      break;
13,972✔
727
    case TDMT_DND_DROP_MNODE:
576✔
728
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
576✔
729
      break;
576✔
730
    case TDMT_DND_CREATE_QNODE:
4,179✔
731
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
4,179✔
732
      break;
4,179✔
733
    case TDMT_DND_DROP_QNODE:
394✔
734
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
394✔
735
      break;
394✔
736
    case TDMT_DND_CREATE_SNODE:
60,751✔
737
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
60,751✔
738
      break;
60,751✔
739
    case TDMT_DND_ALTER_SNODE:
112,167✔
740
      code = (*pMgmt->processAlterNodeFp)(SNODE, pMsg);
112,167✔
741
      break;
112,167✔
742
    case TDMT_DND_DROP_SNODE:
40,298✔
743
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
40,298✔
744
      break;
40,298✔
745
    case TDMT_DND_CREATE_BNODE:
20,341✔
746
      code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg);
20,341✔
747
      break;
20,341✔
748
    case TDMT_DND_DROP_BNODE:
20,339✔
749
      code = (*pMgmt->processDropNodeFp)(BNODE, pMsg);
20,339✔
750
      break;
20,339✔
751
    case TDMT_DND_ALTER_MNODE_TYPE:
170,462✔
752
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
170,462✔
753
      break;
170,462✔
754
    case TDMT_DND_SERVER_STATUS:
166✔
755
      code = dmProcessServerRunStatus(pMgmt, pMsg);
166✔
756
      break;
166✔
757
    case TDMT_DND_SYSTABLE_RETRIEVE:
23,636✔
758
      code = dmProcessRetrieve(pMgmt, pMsg);
23,636✔
759
      break;
23,636✔
760
    case TDMT_MND_GRANT:
709,035✔
761
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
709,035✔
762
      break;
709,035✔
763
    case TDMT_MND_GRANT_NOTIFY:
70,877,044✔
764
      code = dmProcessGrantNotify(NULL, pMsg);
70,877,044✔
765
      break;
70,877,044✔
766
    case TDMT_DND_CREATE_ENCRYPT_KEY:
164✔
767
      code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
164✔
768
      break;
164✔
UNCOV
769
    case TDMT_MND_ALTER_ENCRYPT_KEY:
×
UNCOV
770
      code = dmProcessAlterEncryptKeyReq(pMgmt, pMsg);
×
771
      break;
×
NEW
772
    case TDMT_MND_ALTER_KEY_EXPIRATION:
×
NEW
773
      code = dmProcessAlterKeyExpirationReq(pMgmt, pMsg);
×
NEW
774
      break;
×
775
    case TDMT_DND_RELOAD_DNODE_TLS:
×
776
      code = dmProcessReloadTlsConfig(pMgmt, pMsg);
×
777
      // code = dmProcessReloadEncryptKeyReq(pMgmt, pMsg);
778
      break;
×
779
    default:
×
780

781
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
UNCOV
782
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
783
      break;
×
784
  }
785

786
  if (IsReq(pMsg)) {
72,126,704✔
787
    if (code != 0 && terrno != 0) code = terrno;
72,126,704✔
788
    SRpcMsg rsp = {
144,247,968✔
789
        .code = code,
790
        .pCont = pMsg->info.rsp,
72,126,704✔
791
        .contLen = pMsg->info.rspLen,
72,126,704✔
792
        .info = pMsg->info,
793
    };
794

795
    code = rpcSendResponse(&rsp);
72,126,704✔
796
    if (code != 0) {
72,126,704✔
UNCOV
797
      dError("failed to send response since %s", tstrerror(code));
×
798
    }
799
  }
800

801
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
72,126,704✔
802
  rpcFreeCont(pMsg->pCont);
72,126,704✔
803
  taosFreeQitem(pMsg);
72,126,704✔
804
}
72,126,704✔
805

806
int32_t dmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
14,275,461✔
807
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
14,275,461✔
808
  if (pMsg->code) {
14,275,461✔
809
    *pWorkerIdx = 0;
164,587✔
810
    return TSDB_CODE_SUCCESS;
164,587✔
811
  }
812
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
14,110,874✔
813
  *pWorkerIdx = pHeader->streamGid % tsNumOfStreamMgmtThreads;
14,110,874✔
814
  return TSDB_CODE_SUCCESS;
14,110,874✔
815
}
816

817

818
static void dmProcessStreamMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
14,275,461✔
819
  SDnodeMgmt *pMgmt = pInfo->ahandle;
14,275,461✔
820
  int32_t     code = -1;
14,275,461✔
821
  STraceId   *trace = &pMsg->info.traceId;
14,275,461✔
822
  dGTrace("msg:%p, will be processed in dnode stream mgmt queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
14,275,461✔
823

824
  switch (pMsg->msgType) {
14,275,461✔
825
    case TDMT_MND_STREAM_HEARTBEAT_RSP:
14,275,461✔
826
      code = dmProcessStreamHbRsp(pMgmt, pMsg);
14,275,461✔
827
      break;
14,275,461✔
UNCOV
828
    default:
×
UNCOV
829
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
UNCOV
830
      dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
×
UNCOV
831
      break;
×
832
  }
833

834
  if (IsReq(pMsg)) {
14,275,461✔
835
    if (code != 0 && terrno != 0) code = terrno;
×
836
    SRpcMsg rsp = {
×
837
        .code = code,
UNCOV
838
        .pCont = pMsg->info.rsp,
×
UNCOV
839
        .contLen = pMsg->info.rspLen,
×
840
        .info = pMsg->info,
841
    };
842

843
    code = rpcSendResponse(&rsp);
×
844
    if (code != 0) {
×
UNCOV
845
      dError("failed to send response since %s", tstrerror(code));
×
846
    }
847
  }
848

849
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
14,275,461✔
850
  rpcFreeCont(pMsg->pCont);
14,275,461✔
851
  taosFreeQitem(pMsg);
14,275,461✔
852
}
14,275,461✔
853

854

855
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
556,134✔
856
  int32_t          code = 0;
556,134✔
857
  SSingleWorkerCfg cfg = {
556,134✔
858
      .min = 1,
859
      .max = 1,
860
      .name = "dnode-mgmt",
861
      .fp = (FItem)dmProcessMgmtQueue,
862
      .param = pMgmt,
863
  };
864
  if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
556,134✔
UNCOV
865
    dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
×
UNCOV
866
    return code;
×
867
  }
868

869
  SDispatchWorkerPool* pStMgmtpool = &pMgmt->streamMgmtWorker;
556,134✔
870
  pStMgmtpool->max = tsNumOfStreamMgmtThreads;
556,134✔
871
  pStMgmtpool->name = "dnode-stream-mgmt";
556,134✔
872
  code = tDispatchWorkerInit(pStMgmtpool);
556,134✔
873
  if (code != 0) {
556,134✔
UNCOV
874
    dError("failed to start dnode-stream-mgmt worker since %s", tstrerror(code));
×
UNCOV
875
    return code;
×
876
  }
877
  code = tDispatchWorkerAllocQueue(pStMgmtpool, pMgmt, (FItem)dmProcessStreamMgmtQueue, dmDispatchStreamHbMsg);
556,134✔
878
  if (code != 0) {
556,134✔
879
    dError("failed to allocate dnode-stream-mgmt worker queue since %s", tstrerror(code));
×
880
    return code;
×
881
  }
882

883
  dDebug("dnode workers are initialized");
556,134✔
884
  return 0;
556,134✔
885
}
886

887
void dmStopWorker(SDnodeMgmt *pMgmt) {
556,134✔
888
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
556,134✔
889
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorker);
556,134✔
890
  dDebug("dnode workers are closed");
556,134✔
891
}
556,134✔
892

893
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
72,126,704✔
894
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
72,126,704✔
895
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
72,126,704✔
896
  return taosWriteQitem(pWorker->queue, pMsg);
72,126,704✔
897
}
898

899
int32_t dmPutMsgToStreamMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,275,461✔
900
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorker, pMsg);
14,275,461✔
901
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc