• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4788

14 Oct 2025 11:21AM UTC coverage: 60.992% (-2.3%) from 63.264%
#4788

push

travis-ci

web-flow
Merge 7ca9b50f9 into 19574fe21

154868 of 324306 branches covered (47.75%)

Branch coverage included in aggregate %.

207304 of 269498 relevant lines covered (76.92%)

125773493.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.57
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18
#include "streamMsg.h"
19
#include "stream.h"
20
#include "streamReader.h"
21

22
#define PROCESS_THRESHOLD (2000 * 1000)
23

24
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
281,041,850✔
25
  int32_t code = 0;
281,041,850✔
26
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
281,041,850✔
27
  if (pMgmt->stopped) {
281,053,706!
28
    code = TSDB_CODE_MNODE_STOPPED;
5,215✔
29
  } else {
30
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
281,039,782✔
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
281,047,049✔
33
  return code;
281,053,887✔
34
}
35

36
static inline void mmRelease(SMnodeMgmt *pMgmt) {
281,045,488✔
37
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
281,045,488✔
38
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
281,047,648✔
39
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
281,046,835✔
40
}
281,048,266✔
41

42
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
126,614,531✔
43
  SRpcMsg rsp = {
253,173,627✔
44
      .code = code,
45
      .pCont = pMsg->info.rsp,
126,614,531✔
46
      .contLen = pMsg->info.rspLen,
126,603,214✔
47
      .info = pMsg->info,
48
  };
49
  tmsgSendRsp(&rsp);
126,614,822✔
50
}
126,621,918✔
51

52
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
232,775,666✔
53
  SMnodeMgmt *pMgmt = pInfo->ahandle;
232,775,666✔
54
  pMsg->info.node = pMgmt->pMnode;
232,775,839✔
55

56
  const STraceId *trace = &pMsg->info.traceId;
232,777,261✔
57
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
232,775,065!
58

59
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
232,775,763✔
60

61
  if (pInfo->timestamp != 0) {
232,776,054✔
62
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
232,771,526✔
63
    if (cost > PROCESS_THRESHOLD) {
232,774,071✔
64
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
509,254!
65
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
66
    }
67
  }
68

69
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
232,775,233✔
70
    if (code != 0 && terrno != 0) code = terrno;
126,619,461!
71
    mmSendRsp(pMsg, code);
126,618,841✔
72
  } else {
73
    rpcFreeCont(pMsg->info.rsp);
106,151,212✔
74
    pMsg->info.rsp = NULL;
106,154,404✔
75
  }
76

77
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
232,775,696✔
78
    mndPostProcessQueryMsg(pMsg);
3,911,457✔
79
  }
80

81
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
232,772,515!
82
  rpcFreeCont(pMsg->pCont);
232,772,515✔
83
  taosFreeQitem(pMsg);
232,774,165✔
84
}
232,774,527✔
85

86
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
48,269,761✔
87
  SMnodeMgmt *pMgmt = pInfo->ahandle;
48,269,761✔
88
  pMsg->info.node = pMgmt->pMnode;
48,269,761✔
89

90
  const STraceId *trace = &pMsg->info.traceId;
48,269,872✔
91
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
48,269,761!
92

93
  SMsgHead *pHead = pMsg->pCont;
48,269,761✔
94
  pHead->contLen = ntohl(pHead->contLen);
48,269,872✔
95
  pHead->vgId = ntohl(pHead->vgId);
48,269,872✔
96

97
  int32_t code = mndProcessSyncMsg(pMsg);
48,269,872✔
98

99
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
48,269,872!
100
  rpcFreeCont(pMsg->pCont);
48,269,872✔
101
  taosFreeQitem(pMsg);
48,269,872✔
102
}
48,269,673✔
103

104
static void mmProcessStreamHbMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
13,453,854✔
105
  SMnodeMgmt *pMgmt = pInfo->ahandle;
13,453,854✔
106
  pMsg->info.node = pMgmt->pMnode;
13,453,854✔
107

108
  const STraceId *trace = &pMsg->info.traceId;
13,453,854✔
109
  dGTrace("msg:%p, get from mnode-stream-mgmt queue", pMsg);
13,453,854!
110

111
  (void)mndProcessStreamHb(pMsg);
13,453,854✔
112

113
  dTrace("msg:%p, is freed", pMsg);
13,453,854✔
114
  rpcFreeCont(pMsg->pCont);
13,453,854✔
115
  taosFreeQitem(pMsg);
13,453,854✔
116
}
13,453,854✔
117

118

119
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
281,040,470✔
120
  const STraceId *trace = &pMsg->info.traceId;
281,040,470✔
121
  int32_t         code = 0;
281,040,388✔
122
  if ((code = mmAcquire(pMgmt)) == 0) {
281,040,388✔
123
    if(tsSyncLogHeartbeat){
281,035,106!
124
      dGInfo("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
×
125
    }
126
    else{
127
      dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
281,035,106!
128
    }
129
    code = taosWriteQitem(pWorker->queue, pMsg);
281,035,106✔
130
    mmRelease(pMgmt);
281,044,999✔
131
    return code;
281,048,266✔
132
  } else {
133
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
5,215!
134
            TMSG_INFO(pMsg->msgType));
135
    return code;
5,215✔
136
  }
137
}
138

139
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
40,270,956✔
140
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
40,270,956✔
141
}
142

143
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
170,744✔
144
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
170,744✔
145
}
146

147
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6,607,024✔
148
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
6,607,024✔
149
}
150

151
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,104,126✔
152
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
5,104,126✔
153
}
154

155
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
81,421,672✔
156
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
81,421,672✔
157
}
158

159
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
39,734,426✔
160
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
39,734,426✔
161
}
162

163
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,190,450✔
164
  int32_t         code = 0;
5,190,450✔
165
  int32_t         qType = 0;
5,190,450✔
166
  const STraceId *trace = &pMsg->info.traceId;
5,190,450✔
167

168
  if (NULL == pMgmt->pMnode) {
5,190,450!
169
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
170
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
171
    return code;
×
172
  }
173

174
  pMsg->info.node = pMgmt->pMnode;
5,190,450✔
175
  if ((code = mndPreProcessQueryMsg(pMsg, &qType)) != 0) {
5,190,450!
176
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
177
            TMSG_INFO(pMsg->msgType));
178
    return code;
×
179
  }
180

181
  if (qType == TASK_TYPE_QUERY) {
5,190,450✔
182
    return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
648,862✔
183
  } else if (qType == TASK_TYPE_HQUERY) {
4,541,588!
184
    return mmPutMsgToWorker(pMgmt, &pMgmt->mqueryWorker, pMsg);
4,541,588✔
185
  } else {
186
    code = TSDB_CODE_INVALID_PARA;
×
187
    dGError("msg:%p, invalid task qType:%d, not put into (m)query queue, type:%s", pMsg, qType,
×
188
            TMSG_INFO(pMsg->msgType));
189
    return code;
×
190
  }
191
}
192

193
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
15,051,240✔
194
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
15,051,240✔
195
}
196

197
int32_t mmPutMsgToStreamMgmtQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
13,453,854✔
198
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorkerPool, pMsg);
13,453,854✔
199
}
200

201
int32_t mmPutMsgToStreamReaderQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
202
  const STraceId *trace = &pMsg->info.traceId;
×
203
  int32_t         code = 0;
×
204
  if ((code = mmAcquire(pMgmt)) == 0) {
×
205
    dGDebug("msg:%p, put into %s queue, type:%s", pMsg, pMgmt->streamReaderPool.name, TMSG_INFO(pMsg->msgType));
×
206
    code = taosWriteQitem(pMgmt->pStreamReaderQ, pMsg);
×
207
    mmRelease(pMgmt);
×
208
    return code;
×
209
  } else {
210
    dGDebug("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pMgmt->streamReaderPool.name, tstrerror(code),
×
211
            TMSG_INFO(pMsg->msgType));
212
    return code;
×
213
  }
214
}
215

216

217
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
87,495,511✔
218
  int32_t code;
219

220
  SSingleWorker *pWorker = NULL;
87,495,511✔
221
  switch (qtype) {
87,495,511!
222
    case WRITE_QUEUE:
32,578,312✔
223
      pWorker = &pMgmt->writeWorker;
32,578,312✔
224
      break;
32,578,312✔
225
    case QUERY_QUEUE:
242✔
226
      pWorker = &pMgmt->queryWorker;
242✔
227
      break;
242✔
228
    case FETCH_QUEUE:
×
229
      pWorker = &pMgmt->fetchWorker;
×
230
      break;
×
231
    case READ_QUEUE:
346✔
232
      pWorker = &pMgmt->readWorker;
346✔
233
      break;
346✔
234
    case STATUS_QUEUE:
×
235
      pWorker = &pMgmt->statusWorker;
×
236
      break;
×
237
    case ARB_QUEUE:
18,354,615✔
238
      pWorker = &pMgmt->arbWorker;
18,354,615✔
239
      break;
18,354,615✔
240
    case SYNC_QUEUE:
36,563,257✔
241
      pWorker = &pMgmt->syncWorker;
36,563,257✔
242
      break;
36,563,257✔
243
    case SYNC_RD_QUEUE:
×
244
      pWorker = &pMgmt->syncRdWorker;
×
245
      break;
508✔
246
    default:
×
247
      code = TSDB_CODE_INVALID_PARA;
×
248
  }
249

250
  if (pWorker == NULL) return code;
87,496,019!
251

252
  SRpcMsg *pMsg;
87,442,889✔
253
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
87,496,772✔
254
  if (code) return code;
87,495,640!
255

256
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
87,495,640!
257
  pRpc->pCont = NULL;
87,495,640✔
258

259
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
87,495,511!
260
         pRpc->contLen);
261
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
87,495,511✔
262
  if (code != 0) {
87,495,556✔
263
    dTrace("msg:%p, is freed", pMsg);
3,604!
264
    rpcFreeCont(pMsg->pCont);
3,604✔
265
    taosFreeQitem(pMsg);
3,604✔
266
  }
267
  return code;
87,495,556✔
268
}
269

270
int32_t mmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
13,453,854✔
271
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
13,453,854✔
272
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
13,453,854✔
273
  *pWorkerIdx = pHeader->streamGid % tsNumOfMnodeStreamMgmtThreads;
13,453,854!
274
  return TSDB_CODE_SUCCESS;
13,453,854✔
275
}
276

277
static int32_t mmProcessStreamFetchMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
278
  int32_t            code = 0;
×
279
  int32_t            lino = 0;
×
280
  void*              buf = NULL;
×
281
  size_t             size = 0;
×
282
  SSDataBlock*       pBlock = NULL;
×
283
  void*              taskAddr = NULL;
×
284
  SArray*            pResList = NULL;
×
285
  
286
  SResFetchReq req = {0};
×
287
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
×
288
                              TSDB_CODE_QRY_INVALID_INPUT);
289
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
×
290
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
×
291

292
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
×
293
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
×
294
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
×
295
  void* pTask = sStreamReaderCalcInfo->pTask;
×
296
  ST_TASK_DLOG("mnode %s start", __func__);
×
297

298
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
×
299
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
×
300
    int64_t uid = 0;
×
301
    if (req.dynTbname) {
×
302
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
×
303
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
304
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
305
        if (pValue != NULL && pValue->isTbname) {
×
306
          uid = pValue->uid;
×
307
          break;
×
308
        }
309
      }
310
    }
311
    
312
    SReadHandle handle = {0};
×
313
    handle.mnd = pMgmt->pMnode;
×
314
    handle.uid = uid;
×
315
    handle.pMsgCb = &pMgmt->msgCb;
×
316

317
    //initStorageAPI(&handle.api);
318

319
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
×
320
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
×
321

322
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
×
323
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, MNODE_HANDLE,
324
                                                    req.taskId));
325

326

327
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
×
328
  }
329

330
  if (req.pOpParam != NULL) {
×
331
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
×
332
  }
333
  
334
  pResList = taosArrayInit(4, POINTER_BYTES);
×
335
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
×
336
  uint64_t ts = 0;
×
337
  bool     hasNext = false;
×
338
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, true));
×
339

340
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
×
341
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
×
342
    if (pBlock == NULL) continue;
×
343
    printDataBlock(pBlock, __func__, "streemFetch", ((SStreamTask*)pTask)->streamId);
×
344
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow && pBlock != NULL) {
×
345
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
×
346
      printDataBlock(pBlock, __func__, "fetch filter", ((SStreamTask*)pTask)->streamId);
×
347
    }
348
  }
349

350
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, TSDB_TIME_PRECISION_MILLI));
×
351
  ST_TASK_DLOG("%s end:", __func__);
×
352

353
end:
×
354
  taosArrayDestroy(pResList);
×
355
  streamReleaseTask(taskAddr);
×
356

357
  STREAM_PRINT_LOG_END(code, lino);
×
358
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
×
359
  tmsgSendRsp(&rsp);
×
360
  tDestroySResFetchReq(&req);
×
361
  return code;
×
362
}
363

364
int32_t mmProcessStreamReaderMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
365
  int32_t code = 0;
×
366
  int32_t lino = 0;
×
367
  pMsg->info.node = pMgmt->pMnode;
×
368

369
  const STraceId *trace = &pMsg->info.traceId;
×
370
  dDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
371

372
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
×
373
    TAOS_CHECK_EXIT(mmProcessStreamFetchMsg(pMgmt, pMsg));
×
374
  } else {
375
    dError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
376
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
377
  }
378

379
_exit:
×
380

381
  if (code != 0) {                                                         
×
382
    dError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
383
  }
384
  
385
  return code;
×
386
}
387

388

389
static void mmProcessStreamReaderQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
×
390
  SMnodeMgmt *pMnode = pInfo->ahandle;
×
391
  SRpcMsg   *pMsg = NULL;
×
392

393
  for (int32_t i = 0; i < numOfMsgs; ++i) {
×
394
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
×
395
    const STraceId *trace = &pMsg->info.traceId;
×
396
    dGDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
397

398
    terrno = 0;
×
399
    int32_t code = mmProcessStreamReaderMsg(pMnode, pMsg);
×
400

401
    dGDebug("msg:%p, is freed, code:0x%x [mmProcessStreamReaderQueue]", pMsg, code);
×
402
    rpcFreeCont(pMsg->pCont);
×
403
    taosFreeQitem(pMsg);
×
404
  }
405
}
×
406

407

408
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
548,534✔
409
  int32_t          code = 0;
548,534✔
410
  SSingleWorkerCfg qCfg = {
548,534✔
411
      .min = tsNumOfMnodeQueryThreads,
412
      .max = tsNumOfMnodeQueryThreads,
413
      .name = "mnode-query",
414
      .fp = (FItem)mmProcessRpcMsg,
415
      .param = pMgmt,
416
      .poolType = QUERY_AUTO_QWORKER_POOL,
417
  };
418
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
548,534!
419
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
420
    return code;
×
421
  }
422

423
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
548,534✔
424

425
  SSingleWorkerCfg mqCfg = {
548,534✔
426
      .min = 4,
427
      .max = 4,
428
      .name = "mnode-mquery",
429
      .fp = (FItem)mmProcessRpcMsg,
430
      .param = pMgmt,
431
      .poolType = QUERY_AUTO_QWORKER_POOL,
432
  };
433
  if ((code = tSingleWorkerInit(&pMgmt->mqueryWorker, &mqCfg)) != 0) {
548,534!
434
    dError("failed to start mnode-mquery worker since %s", tstrerror(code));
×
435
    return code;
×
436
  }
437

438
  tsNumOfQueryThreads += 4;
548,534✔
439

440
  SSingleWorkerCfg fCfg = {
548,534✔
441
      .min = tsNumOfMnodeFetchThreads,
442
      .max = tsNumOfMnodeFetchThreads,
443
      .name = "mnode-fetch",
444
      .fp = (FItem)mmProcessRpcMsg,
445
      .param = pMgmt,
446
  };
447
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
548,534!
448
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
449
    return code;
×
450
  }
451

452
  SSingleWorkerCfg rCfg = {
548,534✔
453
      .min = tsNumOfMnodeReadThreads,
454
      .max = tsNumOfMnodeReadThreads,
455
      .name = "mnode-read",
456
      .fp = (FItem)mmProcessRpcMsg,
457
      .param = pMgmt,
458
  };
459
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
548,534!
460
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
461
    return code;
×
462
  }
463

464
  SSingleWorkerCfg stautsCfg = {
548,534✔
465
      .min = 1,
466
      .max = 1,
467
      .name = "mnode-status",
468
      .fp = (FItem)mmProcessRpcMsg,
469
      .param = pMgmt,
470
  };
471
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
548,534!
472
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
473
    return code;
×
474
  }
475

476
  SSingleWorkerCfg wCfg = {
548,534✔
477
      .min = 1,
478
      .max = 1,
479
      .name = "mnode-write",
480
      .fp = (FItem)mmProcessRpcMsg,
481
      .param = pMgmt,
482
  };
483
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
548,534!
484
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
485
    return code;
×
486
  }
487

488
  SSingleWorkerCfg sCfg = {
548,534✔
489
      .min = 1,
490
      .max = 1,
491
      .name = "mnode-sync",
492
      .fp = (FItem)mmProcessSyncMsg,
493
      .param = pMgmt,
494
  };
495
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
548,534!
496
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
497
    return code;
×
498
  }
499

500
  SSingleWorkerCfg scCfg = {
548,534✔
501
      .min = 1,
502
      .max = 1,
503
      .name = "mnode-sync-rd",
504
      .fp = (FItem)mmProcessSyncMsg,
505
      .param = pMgmt,
506
  };
507
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
548,534!
508
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
509
    return code;
×
510
  }
511

512
  SSingleWorkerCfg arbCfg = {
548,534✔
513
      .min = 1,
514
      .max = 1,
515
      .name = "mnode-arb",
516
      .fp = (FItem)mmProcessRpcMsg,
517
      .param = pMgmt,
518
  };
519
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
548,534!
520
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
521
    return code;
×
522
  }
523

524
  SDispatchWorkerPool* pPool = &pMgmt->streamMgmtWorkerPool;
548,534✔
525
  pPool->max = tsNumOfMnodeStreamMgmtThreads;
548,534✔
526
  pPool->name = "mnode-stream-mgmt";
548,534✔
527
  code = tDispatchWorkerInit(pPool);
548,534✔
528
  if (code != 0) {
548,534!
529
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
530
    return code;
×
531
  }
532
  code = tDispatchWorkerAllocQueue(pPool, pMgmt, (FItem)mmProcessStreamHbMsg, mmDispatchStreamHbMsg);
548,534✔
533
  if (code != 0) {
548,534!
534
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
535
    return code;
×
536
  }
537

538
  SWWorkerPool *pStreamReaderPool = &pMgmt->streamReaderPool;
548,534✔
539
  pStreamReaderPool->name = "mnode-st-reader";
548,534✔
540
  pStreamReaderPool->max = 2;
548,534✔
541
  if ((code = tWWorkerInit(pStreamReaderPool)) != 0) return code;
548,534!
542

543
  pMgmt->pStreamReaderQ = tWWorkerAllocQueue(&pMgmt->streamReaderPool, pMgmt, mmProcessStreamReaderQueue);
548,534✔
544

545
  dDebug("mnode workers are initialized");
548,534✔
546
  return code;
548,534✔
547
}
548

549
void mmStopWorker(SMnodeMgmt *pMgmt) {
548,534✔
550
  while (pMgmt->refCount > 0) taosMsleep(10);
548,534!
551

552
  tSingleWorkerCleanup(&pMgmt->queryWorker);
548,534✔
553
  tSingleWorkerCleanup(&pMgmt->mqueryWorker);
548,534✔
554
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
548,534✔
555
  tSingleWorkerCleanup(&pMgmt->readWorker);
548,534✔
556
  tSingleWorkerCleanup(&pMgmt->statusWorker);
548,534✔
557
  tSingleWorkerCleanup(&pMgmt->writeWorker);
548,534✔
558
  tSingleWorkerCleanup(&pMgmt->arbWorker);
548,534✔
559
  tSingleWorkerCleanup(&pMgmt->syncWorker);
548,534✔
560
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
548,534✔
561
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorkerPool);
548,534✔
562
  tWWorkerFreeQueue(&pMgmt->streamReaderPool, pMgmt->pStreamReaderQ);
548,534✔
563
  tWWorkerCleanup(&pMgmt->streamReaderPool);
548,534✔
564
  
565
  dDebug("mnode workers are closed");
548,534✔
566
}
548,534✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc