• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5044

06 May 2026 02:35AM UTC coverage: 73.169% (+0.06%) from 73.107%
#5044

push

travis-ci

web-flow
feat: [6659794715] cpu limit (#35153)

244 of 275 new or added lines in 23 files covered. (88.73%)

526 existing lines in 141 files now uncovered.

277745 of 379596 relevant lines covered (73.17%)

133740972.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.57
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18
#include "streamMsg.h"
19
#include "stream.h"
20
#include "streamReader.h"
21

22
#define PROCESS_THRESHOLD (2000 * 1000)
23

24
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
386,551,647✔
25
  int32_t code = 0;
386,551,647✔
26
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
386,551,647✔
27
  if (pMgmt->stopped) {
386,563,988✔
28
    code = TSDB_CODE_MNODE_STOPPED;
5,028✔
29
  } else {
30
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
386,554,498✔
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
386,563,834✔
33
  return code;
386,565,182✔
34
}
35

36
static inline void mmRelease(SMnodeMgmt *pMgmt) {
386,558,549✔
37
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
386,558,549✔
38
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
386,562,040✔
39
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
386,561,180✔
40
}
386,561,154✔
41

42
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
162,427,595✔
43
  SRpcMsg rsp = {
324,817,800✔
44
      .code = code,
45
      .pCont = pMsg->info.rsp,
162,428,438✔
46
      .contLen = pMsg->info.rspLen,
162,418,197✔
47
      .info = pMsg->info,
48
  };
49
  tmsgSendRsp(&rsp);
162,426,887✔
50
}
162,438,272✔
51

52
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
322,111,964✔
53
  SMnodeMgmt *pMgmt = pInfo->ahandle;
322,111,964✔
54
  pMsg->info.node = pMgmt->pMnode;
322,111,618✔
55

56
  const STraceId *trace = &pMsg->info.traceId;
322,110,647✔
57
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
322,108,574✔
58

59
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
322,108,832✔
60

61
  if (pInfo->timestamp != 0) {
322,107,354✔
62
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
322,104,253✔
63
    if (cost > PROCESS_THRESHOLD) {
322,103,770✔
64
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
583,781✔
65
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
66
    }
67
  }
68

69
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
322,102,528✔
70
    if (code != 0 && terrno != 0) code = terrno;
162,434,753✔
71
    mmSendRsp(pMsg, code);
162,434,584✔
72
  } else {
73
    rpcFreeCont(pMsg->info.rsp);
159,668,656✔
74
    pMsg->info.rsp = NULL;
159,671,591✔
75
  }
76

77
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
322,108,736✔
78
    mndPostProcessQueryMsg(pMsg);
2,770,061✔
79
  }
80

81
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
322,107,030✔
82
  rpcFreeCont(pMsg->pCont);
322,107,030✔
83
  taosFreeQitem(pMsg);
322,106,957✔
84
}
322,109,119✔
85

86
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
64,449,355✔
87
  SMnodeMgmt *pMgmt = pInfo->ahandle;
64,449,355✔
88
  pMsg->info.node = pMgmt->pMnode;
64,449,355✔
89

90
  const STraceId *trace = &pMsg->info.traceId;
64,449,355✔
91
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
64,449,355✔
92

93
  SMsgHead *pHead = pMsg->pCont;
64,449,355✔
94
  pHead->contLen = ntohl(pHead->contLen);
64,449,355✔
95
  pHead->vgId = ntohl(pHead->vgId);
64,449,355✔
96

97
  int32_t code = mndProcessSyncMsg(pMsg);
64,449,355✔
98

99
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
64,449,355✔
100
  rpcFreeCont(pMsg->pCont);
64,449,355✔
101
  taosFreeQitem(pMsg);
64,449,355✔
102
}
64,449,355✔
103

104
static void mmProcessStreamHbMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
18,577,750✔
105
  SMnodeMgmt *pMgmt = pInfo->ahandle;
18,577,750✔
106
  pMsg->info.node = pMgmt->pMnode;
18,577,750✔
107

108
  const STraceId *trace = &pMsg->info.traceId;
18,577,750✔
109
  dGTrace("msg:%p, get from mnode-stream-mgmt queue", pMsg);
18,577,750✔
110

111
  (void)mndProcessStreamHb(pMsg);
18,577,750✔
112

113
  dTrace("msg:%p, is freed", pMsg);
18,577,750✔
114
  rpcFreeCont(pMsg->pCont);
18,577,750✔
115
  taosFreeQitem(pMsg);
18,577,750✔
116
}
18,577,750✔
117

118

119
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
386,553,708✔
120
  const STraceId *trace = &pMsg->info.traceId;
386,553,708✔
121
  int32_t         code = 0;
386,554,156✔
122
  if ((code = mmAcquire(pMgmt)) == 0) {
386,554,156✔
123
    if(tsSyncLogHeartbeat){
386,550,666✔
124
      dGInfo("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
×
125
    }
126
    else{
127
      dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
386,550,666✔
128
    }
129
    code = taosWriteQitem(pWorker->queue, pMsg);
386,550,666✔
130
    mmRelease(pMgmt);
386,559,483✔
131
    return code;
386,561,154✔
132
  } else {
133
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
5,042✔
134
            TMSG_INFO(pMsg->msgType));
135
    return code;
5,042✔
136
  }
137
}
138

139
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
58,241,999✔
140
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
58,241,999✔
141
}
142

143
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
157,363✔
144
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
157,363✔
145
}
146

147
int32_t mmPutMsgToAuditQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
460✔
148
  return mmPutMsgToWorker(pMgmt, &pMgmt->auditWorker, pMsg);
460✔
149
}
150

151
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8,135,456✔
152
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
8,135,456✔
153
}
154

155
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,148,770✔
156
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
4,148,770✔
157
}
158

159
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
101,822,700✔
160
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
101,822,700✔
161
}
162

163
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
53,965,316✔
164
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
53,965,316✔
165
}
166

167
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6,152,768✔
168
  int32_t         code = 0;
6,152,768✔
169
  int32_t         qType = 0;
6,152,768✔
170
  const STraceId *trace = &pMsg->info.traceId;
6,152,768✔
171

172
  if (NULL == pMgmt->pMnode) {
6,152,768✔
173
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
174
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
175
    return code;
×
176
  }
177

178
  pMsg->info.node = pMgmt->pMnode;
6,152,768✔
179
  if ((code = mndPreProcessQueryMsg(pMsg, &qType)) != 0) {
6,152,768✔
180
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
181
            TMSG_INFO(pMsg->msgType));
182
    return code;
×
183
  }
184

185
  if (qType == TASK_TYPE_QUERY) {
6,152,768✔
186
    return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
1,325,805✔
187
  } else if (qType == TASK_TYPE_HQUERY) {
4,826,963✔
188
    return mmPutMsgToWorker(pMgmt, &pMgmt->mqueryWorker, pMsg);
4,826,963✔
189
  } else {
190
    code = TSDB_CODE_INVALID_PARA;
×
191
    dGError("msg:%p, invalid task qType:%d, not put into (m)query queue, type:%s", pMsg, qType,
×
192
            TMSG_INFO(pMsg->msgType));
193
    return code;
×
194
  }
195
}
196

197
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
17,383,275✔
198
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
17,383,275✔
199
}
200

201
int32_t mmPutMsgToStreamMgmtQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
18,577,750✔
202
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorkerPool, pMsg);
18,577,750✔
203
}
204

205
int32_t mmPutMsgToStreamReaderQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
206
  const STraceId *trace = &pMsg->info.traceId;
×
207
  int32_t         code = 0;
×
208
  if ((code = mmAcquire(pMgmt)) == 0) {
×
209
    dGDebug("msg:%p, put into %s queue, type:%s", pMsg, pMgmt->streamReaderPool.name, TMSG_INFO(pMsg->msgType));
×
210
    code = taosWriteQitem(pMgmt->pStreamReaderQ, pMsg);
×
211
    mmRelease(pMgmt);
×
212
    return code;
×
213
  } else {
214
    dGDebug("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pMgmt->streamReaderPool.name, tstrerror(code),
×
215
            TMSG_INFO(pMsg->msgType));
216
    return code;
×
217
  }
218
}
219

220

221
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
136,549,504✔
222
  int32_t code;
223

224
  SSingleWorker *pWorker = NULL;
136,549,504✔
225
  switch (qtype) {
136,549,504✔
226
    case WRITE_QUEUE:
57,313,526✔
227
      pWorker = &pMgmt->writeWorker;
57,313,526✔
228
      break;
57,313,526✔
229
    case QUERY_QUEUE:
340✔
230
      pWorker = &pMgmt->queryWorker;
340✔
231
      break;
340✔
232
    case FETCH_QUEUE:
×
233
      pWorker = &pMgmt->fetchWorker;
×
234
      break;
×
235
    case READ_QUEUE:
168✔
236
      pWorker = &pMgmt->readWorker;
168✔
237
      break;
168✔
238
    case STATUS_QUEUE:
×
239
      pWorker = &pMgmt->statusWorker;
×
240
      break;
×
241
    case ARB_QUEUE:
27,065,618✔
242
      pWorker = &pMgmt->arbWorker;
27,065,618✔
243
      break;
27,065,618✔
244
    case SYNC_QUEUE:
52,169,852✔
245
      pWorker = &pMgmt->syncWorker;
52,169,852✔
246
      break;
52,169,852✔
247
    case SYNC_RD_QUEUE:
×
248
      pWorker = &pMgmt->syncRdWorker;
×
249
      break;
×
250
    case AUDIT_QUEUE:
×
251
      pWorker = &pMgmt->auditWorker;
×
UNCOV
252
      break;
×
253
    default:
×
254
      code = TSDB_CODE_INVALID_PARA;
×
255
  }
256

257
  if (pWorker == NULL) return code;
136,549,504✔
258

259
  SRpcMsg *pMsg;
136,477,556✔
260
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
136,549,504✔
261
  if (code) return code;
136,549,504✔
262

263
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
136,549,504✔
264
  pRpc->pCont = NULL;
136,549,504✔
265

266
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
136,549,504✔
267
         pRpc->contLen);
268
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
136,549,504✔
269
  if (code != 0) {
136,549,504✔
270
    dTrace("msg:%p, is freed", pMsg);
3,932✔
271
    rpcFreeCont(pMsg->pCont);
3,932✔
272
    taosFreeQitem(pMsg);
3,932✔
273
  }
274
  return code;
136,549,504✔
275
}
276

277
int32_t mmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
18,577,750✔
278
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
18,577,750✔
279
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
18,577,750✔
280
  *pWorkerIdx = pHeader->streamGid % tsNumOfMnodeStreamMgmtThreads;
18,577,750✔
281
  return TSDB_CODE_SUCCESS;
18,577,750✔
282
}
283

284
static int32_t mmProcessStreamFetchMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
285
  int32_t            code = 0;
×
286
  int32_t            lino = 0;
×
287
  void*              buf = NULL;
×
288
  size_t             size = 0;
×
289
  SSDataBlock*       pBlock = NULL;
×
290
  void*              taskAddr = NULL;
×
291
  SArray*            pResList = NULL;
×
292
  
293
  SResFetchReq req = {0};
×
294
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
×
295
                              TSDB_CODE_QRY_INVALID_INPUT);
296
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
×
297
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
×
298

299
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
×
300
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
×
301
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
×
302
  void* pTask = sStreamReaderCalcInfo->pTask;
×
303
  ST_TASK_DLOG("mnode %s start", __func__);
×
304

305
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
×
306
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
×
307
    int64_t uid = 0;
×
308
    if (req.dynTbname) {
×
309
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
×
310
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
311
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
312
        if (pValue != NULL && pValue->isTbname) {
×
313
          uid = pValue->uid;
×
314
          break;
×
315
        }
316
      }
317
    }
318
    
319
    SReadHandle handle = {0};
×
320
    handle.mnd = pMgmt->pMnode;
×
321
    handle.uid = uid;
×
322
    handle.pMsgCb = &pMgmt->msgCb;
×
323

324
    //initStorageAPI(&handle.api);
325

326
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
×
327
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
×
328

329
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
×
330
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, MNODE_HANDLE,
331
                                                    req.taskId));
332

333

334
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
×
335
  }
336

337
  if (req.pOpParam != NULL) {
×
338
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, (void*)req.pOpParam);
×
339
  }
340
  
341
  pResList = taosArrayInit(4, POINTER_BYTES);
×
342
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
×
343
  uint64_t ts = 0;
×
344
  bool     hasNext = false;
×
345
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, true));
×
346

347
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
×
348
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
×
349
    if (pBlock == NULL) continue;
×
350
    printDataBlock(pBlock, __func__, "streemFetch", ((SStreamTask*)pTask)->streamId);
×
351
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow && pBlock != NULL) {
×
352
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
×
353
      printDataBlock(pBlock, __func__, "fetch filter", ((SStreamTask*)pTask)->streamId);
×
354
    }
355
  }
356

357
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, TSDB_TIME_PRECISION_MILLI));
×
358
  ST_TASK_DLOG("%s end:", __func__);
×
359

360
end:
×
361
  taosArrayDestroy(pResList);
×
362
  streamReleaseTask(taskAddr);
×
363

364
  STREAM_PRINT_LOG_END(code, lino);
×
365
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
×
366
  tmsgSendRsp(&rsp);
×
367
  tDestroySResFetchReq(&req);
×
368
  return code;
×
369
}
370

371
int32_t mmProcessStreamReaderMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
372
  int32_t code = 0;
×
373
  int32_t lino = 0;
×
374
  pMsg->info.node = pMgmt->pMnode;
×
375

376
  const STraceId *trace = &pMsg->info.traceId;
×
377
  dDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
378

379
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
×
380
    TAOS_CHECK_EXIT(mmProcessStreamFetchMsg(pMgmt, pMsg));
×
381
  } else {
382
    dError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
383
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
384
  }
385

386
_exit:
×
387

388
  if (code != 0) {                                                         
×
389
    dError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
390
  }
391
  
392
  return code;
×
393
}
394

395

396
static void mmProcessStreamReaderQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
×
397
  SMnodeMgmt *pMnode = pInfo->ahandle;
×
398
  SRpcMsg   *pMsg = NULL;
×
399

400
  for (int32_t i = 0; i < numOfMsgs; ++i) {
×
401
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
×
402
    const STraceId *trace = &pMsg->info.traceId;
×
403
    dGDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
404

405
    terrno = 0;
×
406
    int32_t code = mmProcessStreamReaderMsg(pMnode, pMsg);
×
407

408
    dGDebug("msg:%p, is freed, code:0x%x [mmProcessStreamReaderQueue]", pMsg, code);
×
409
    rpcFreeCont(pMsg->pCont);
×
410
    taosFreeQitem(pMsg);
×
411
  }
412
}
×
413

414

415
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
488,269✔
416
  int32_t          code = 0;
488,269✔
417
  SSingleWorkerCfg qCfg = {
488,269✔
418
      .min = tsNumOfMnodeQueryThreads,
419
      .max = tsNumOfMnodeQueryThreads,
420
      .name = "mnode-query",
421
      .fp = (FItem)mmProcessRpcMsg,
422
      .param = pMgmt,
423
      .poolType = QUERY_AUTO_QWORKER_POOL,
424
  };
425
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
488,269✔
426
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
427
    return code;
×
428
  }
429

430
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
488,269✔
431

432
  SSingleWorkerCfg mqCfg = {
488,269✔
433
      .min = 4,
434
      .max = 4,
435
      .name = "mnode-mquery",
436
      .fp = (FItem)mmProcessRpcMsg,
437
      .param = pMgmt,
438
      .poolType = QUERY_AUTO_QWORKER_POOL,
439
  };
440
  if ((code = tSingleWorkerInit(&pMgmt->mqueryWorker, &mqCfg)) != 0) {
488,269✔
441
    dError("failed to start mnode-mquery worker since %s", tstrerror(code));
×
442
    return code;
×
443
  }
444

445
  tsNumOfQueryThreads += 4;
488,269✔
446

447
  SSingleWorkerCfg fCfg = {
488,269✔
448
      .min = tsNumOfMnodeFetchThreads,
449
      .max = tsNumOfMnodeFetchThreads,
450
      .name = "mnode-fetch",
451
      .fp = (FItem)mmProcessRpcMsg,
452
      .param = pMgmt,
453
  };
454
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
488,269✔
455
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
456
    return code;
×
457
  }
458

459
  SSingleWorkerCfg rCfg = {
488,269✔
460
      .min = tsNumOfMnodeReadThreads,
461
      .max = tsNumOfMnodeReadThreads,
462
      .name = "mnode-read",
463
      .fp = (FItem)mmProcessRpcMsg,
464
      .param = pMgmt,
465
  };
466
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
488,269✔
467
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
468
    return code;
×
469
  }
470

471
  SSingleWorkerCfg stautsCfg = {
488,269✔
472
      .min = 1,
473
      .max = 1,
474
      .name = "mnode-status",
475
      .fp = (FItem)mmProcessRpcMsg,
476
      .param = pMgmt,
477
  };
478
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
488,269✔
479
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
480
    return code;
×
481
  }
482

483
  SSingleWorkerCfg wCfg = {
488,269✔
484
      .min = 1,
485
      .max = 1,
486
      .name = "mnode-write",
487
      .fp = (FItem)mmProcessRpcMsg,
488
      .param = pMgmt,
489
  };
490
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
488,269✔
491
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
492
    return code;
×
493
  }
494

495
  SSingleWorkerCfg sCfg = {
488,269✔
496
      .min = 1,
497
      .max = 1,
498
      .name = "mnode-sync",
499
      .fp = (FItem)mmProcessSyncMsg,
500
      .param = pMgmt,
501
  };
502
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
488,269✔
503
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
504
    return code;
×
505
  }
506

507
  SSingleWorkerCfg scCfg = {
488,269✔
508
      .min = 1,
509
      .max = 1,
510
      .name = "mnode-sync-rd",
511
      .fp = (FItem)mmProcessSyncMsg,
512
      .param = pMgmt,
513
  };
514
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
488,269✔
515
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
516
    return code;
×
517
  }
518

519
  SSingleWorkerCfg arbCfg = {
488,269✔
520
      .min = 1,
521
      .max = 1,
522
      .name = "mnode-arb",
523
      .fp = (FItem)mmProcessRpcMsg,
524
      .param = pMgmt,
525
  };
526
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
488,269✔
527
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
528
    return code;
×
529
  }
530

531
  SSingleWorkerCfg auditCfg = {
488,269✔
532
      .min = 1,
533
      .max = 1,
534
      .name = "mnode-audit",
535
      .fp = (FItem)mmProcessRpcMsg,
536
      .param = pMgmt,
537
  };
538
  if ((code = tSingleWorkerInit(&pMgmt->auditWorker, &auditCfg)) != 0) {
488,269✔
539
    dError("failed to start mnode mnode-audit worker since %s", tstrerror(code));
×
540
    return code;
×
541
  }
542

543
  SDispatchWorkerPool* pPool = &pMgmt->streamMgmtWorkerPool;
488,269✔
544
  pPool->max = tsNumOfMnodeStreamMgmtThreads;
488,269✔
545
  pPool->name = "mnode-stream-mgmt";
488,269✔
546
  code = tDispatchWorkerInit(pPool);
488,269✔
547
  if (code != 0) {
488,269✔
548
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
549
    return code;
×
550
  }
551
  code = tDispatchWorkerAllocQueue(pPool, pMgmt, (FItem)mmProcessStreamHbMsg, mmDispatchStreamHbMsg);
488,269✔
552
  if (code != 0) {
488,269✔
553
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
554
    return code;
×
555
  }
556

557
  SWWorkerPool *pStreamReaderPool = &pMgmt->streamReaderPool;
488,269✔
558
  pStreamReaderPool->name = "mnode-st-reader";
488,269✔
559
  pStreamReaderPool->max = 2;
488,269✔
560
  if ((code = tWWorkerInit(pStreamReaderPool)) != 0) return code;
488,269✔
561

562
  pMgmt->pStreamReaderQ = tWWorkerAllocQueue(&pMgmt->streamReaderPool, pMgmt, mmProcessStreamReaderQueue);
488,269✔
563

564
  dDebug("mnode workers are initialized");
488,269✔
565
  return code;
488,269✔
566
}
567

568
void mmStopWorker(SMnodeMgmt *pMgmt) {
488,269✔
569
  while (pMgmt->refCount > 0) taosMsleep(10);
488,269✔
570

571
  tSingleWorkerCleanup(&pMgmt->queryWorker);
488,269✔
572
  tSingleWorkerCleanup(&pMgmt->mqueryWorker);
488,269✔
573
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
488,269✔
574
  tSingleWorkerCleanup(&pMgmt->readWorker);
488,269✔
575
  tSingleWorkerCleanup(&pMgmt->statusWorker);
488,269✔
576
  tSingleWorkerCleanup(&pMgmt->writeWorker);
488,269✔
577
  tSingleWorkerCleanup(&pMgmt->arbWorker);
488,269✔
578
  tSingleWorkerCleanup(&pMgmt->syncWorker);
488,269✔
579
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
488,269✔
580
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorkerPool);
488,269✔
581
  tWWorkerFreeQueue(&pMgmt->streamReaderPool, pMgmt->pStreamReaderQ);
488,269✔
582
  tWWorkerCleanup(&pMgmt->streamReaderPool);
488,269✔
583
  tSingleWorkerCleanup(&pMgmt->auditWorker);
488,269✔
584

585
  dDebug("mnode workers are closed");
488,269✔
586
}
488,269✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc