• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5016

03 Apr 2026 03:59PM UTC coverage: 72.299% (+0.01%) from 72.289%
#5016

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4055 of 5985 new or added lines in 68 files covered. (67.75%)

13126 existing lines in 156 files now uncovered.

257424 of 356056 relevant lines covered (72.3%)

133108577.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.57
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18
#include "streamMsg.h"
19
#include "stream.h"
20
#include "streamReader.h"
21

22
#define PROCESS_THRESHOLD (2000 * 1000)
23

24
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
319,923,767✔
25
  int32_t code = 0;
319,923,767✔
26
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
319,923,767✔
27
  if (pMgmt->stopped) {
319,945,067✔
28
    code = TSDB_CODE_MNODE_STOPPED;
2,851✔
29
  } else {
30
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
319,925,187✔
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
319,944,190✔
33
  return code;
319,933,924✔
34
}
35

36
static inline void mmRelease(SMnodeMgmt *pMgmt) {
319,941,414✔
37
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
319,941,414✔
38
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
319,946,843✔
39
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
319,945,531✔
40
}
319,946,427✔
41

42
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
131,538,190✔
43
  SRpcMsg rsp = {
262,463,265✔
44
      .code = code,
45
      .pCont = pMsg->info.rsp,
131,539,674✔
46
      .contLen = pMsg->info.rspLen,
131,531,095✔
47
      .info = pMsg->info,
48
  };
49
  tmsgSendRsp(&rsp);
131,539,823✔
50
}
131,547,364✔
51

52
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
262,855,754✔
53
  SMnodeMgmt *pMgmt = pInfo->ahandle;
262,855,754✔
54
  pMsg->info.node = pMgmt->pMnode;
262,856,132✔
55

56
  const STraceId *trace = &pMsg->info.traceId;
262,856,218✔
57
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
262,855,964✔
58

59
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
262,855,964✔
60

61
  if (pInfo->timestamp != 0) {
262,849,650✔
62
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
262,848,028✔
63
    if (cost > PROCESS_THRESHOLD) {
262,848,434✔
64
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
483,647✔
65
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
66
    }
67
  }
68

69
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
262,848,224✔
70
    if (code != 0 && terrno != 0) code = terrno;
131,544,588✔
71
    mmSendRsp(pMsg, code);
131,544,918✔
72
  } else {
73
    rpcFreeCont(pMsg->info.rsp);
131,303,051✔
74
    pMsg->info.rsp = NULL;
131,303,287✔
75
  }
76

77
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
262,849,980✔
78
    mndPostProcessQueryMsg(pMsg);
2,418,663✔
79
  }
80

81
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
262,847,766✔
82
  rpcFreeCont(pMsg->pCont);
262,849,649✔
83
  taosFreeQitem(pMsg);
262,852,930✔
84
}
262,843,917✔
85

86
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
57,090,798✔
87
  SMnodeMgmt *pMgmt = pInfo->ahandle;
57,090,798✔
88
  pMsg->info.node = pMgmt->pMnode;
57,090,798✔
89

90
  const STraceId *trace = &pMsg->info.traceId;
57,090,798✔
91
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
57,090,696✔
92

93
  SMsgHead *pHead = pMsg->pCont;
57,090,696✔
94
  pHead->contLen = ntohl(pHead->contLen);
57,090,696✔
95
  pHead->vgId = ntohl(pHead->vgId);
57,090,798✔
96

97
  int32_t code = mndProcessSyncMsg(pMsg);
57,090,696✔
98

99
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
57,090,798✔
100
  rpcFreeCont(pMsg->pCont);
57,090,798✔
101
  taosFreeQitem(pMsg);
57,090,798✔
102
}
57,090,619✔
103

104
static void mmProcessStreamHbMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
14,513,271✔
105
  SMnodeMgmt *pMgmt = pInfo->ahandle;
14,513,271✔
106
  pMsg->info.node = pMgmt->pMnode;
14,513,271✔
107

108
  const STraceId *trace = &pMsg->info.traceId;
14,513,271✔
109
  dGTrace("msg:%p, get from mnode-stream-mgmt queue", pMsg);
14,513,271✔
110

111
  (void)mndProcessStreamHb(pMsg);
14,513,271✔
112

113
  dTrace("msg:%p, is freed", pMsg);
14,513,271✔
114
  rpcFreeCont(pMsg->pCont);
14,513,271✔
115
  taosFreeQitem(pMsg);
14,513,271✔
116
}
14,513,271✔
117

118

119
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
319,911,456✔
120
  const STraceId *trace = &pMsg->info.traceId;
319,911,456✔
121
  int32_t         code = 0;
319,917,242✔
122
  if ((code = mmAcquire(pMgmt)) == 0) {
319,917,242✔
123
    if(tsSyncLogHeartbeat){
319,918,070✔
124
      dGInfo("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
×
125
    }
126
    else{
127
      dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
319,918,070✔
128
    }
129
    code = taosWriteQitem(pWorker->queue, pMsg);
319,918,070✔
130
    mmRelease(pMgmt);
319,940,348✔
131
    return code;
319,946,858✔
132
  } else {
133
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
2,872✔
134
            TMSG_INFO(pMsg->msgType));
135
    return code;
2,872✔
136
  }
137
}
138

139
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
51,758,733✔
140
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
51,758,733✔
141
}
142

143
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
127,907✔
144
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
127,907✔
145
}
146

147
int32_t mmPutMsgToAuditQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
232✔
148
  return mmPutMsgToWorker(pMgmt, &pMgmt->auditWorker, pMsg);
232✔
149
}
150

151
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
7,095,936✔
152
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
7,095,936✔
153
}
154

155
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,275,157✔
156
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
3,275,157✔
157
}
158

159
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
82,955,039✔
160
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
82,955,039✔
161
}
162

163
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
42,636,111✔
164
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
42,636,111✔
165
}
166

167
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,536,669✔
168
  int32_t         code = 0;
5,536,669✔
169
  int32_t         qType = 0;
5,536,669✔
170
  const STraceId *trace = &pMsg->info.traceId;
5,536,669✔
171

172
  if (NULL == pMgmt->pMnode) {
5,536,669✔
173
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
174
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
175
    return code;
×
176
  }
177

178
  pMsg->info.node = pMgmt->pMnode;
5,536,669✔
179
  if ((code = mndPreProcessQueryMsg(pMsg, &qType)) != 0) {
5,536,669✔
180
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
181
            TMSG_INFO(pMsg->msgType));
182
    return code;
×
183
  }
184

185
  if (qType == TASK_TYPE_QUERY) {
5,536,669✔
186
    return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
1,235,428✔
187
  } else if (qType == TASK_TYPE_HQUERY) {
4,301,241✔
188
    return mmPutMsgToWorker(pMgmt, &pMgmt->mqueryWorker, pMsg);
4,301,241✔
189
  } else {
190
    code = TSDB_CODE_INVALID_PARA;
×
191
    dGError("msg:%p, invalid task qType:%d, not put into (m)query queue, type:%s", pMsg, qType,
×
192
            TMSG_INFO(pMsg->msgType));
193
    return code;
×
194
  }
195
}
196

197
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
15,622,564✔
198
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
15,622,564✔
199
}
200

201
int32_t mmPutMsgToStreamMgmtQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,513,271✔
202
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorkerPool, pMsg);
14,513,271✔
203
}
204

205
int32_t mmPutMsgToStreamReaderQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
206
  const STraceId *trace = &pMsg->info.traceId;
×
207
  int32_t         code = 0;
×
208
  if ((code = mmAcquire(pMgmt)) == 0) {
×
209
    dGDebug("msg:%p, put into %s queue, type:%s", pMsg, pMgmt->streamReaderPool.name, TMSG_INFO(pMsg->msgType));
×
210
    code = taosWriteQitem(pMgmt->pStreamReaderQ, pMsg);
×
211
    mmRelease(pMgmt);
×
212
    return code;
×
213
  } else {
214
    dGDebug("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pMgmt->streamReaderPool.name, tstrerror(code),
×
215
            TMSG_INFO(pMsg->msgType));
216
    return code;
×
217
  }
218
}
219

220

221
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
110,918,003✔
222
  int32_t code;
223

224
  SSingleWorker *pWorker = NULL;
110,918,003✔
225
  switch (qtype) {
110,918,003✔
226
    case WRITE_QUEUE:
43,828,414✔
227
      pWorker = &pMgmt->writeWorker;
43,828,414✔
228
      break;
43,828,414✔
229
    case QUERY_QUEUE:
336✔
230
      pWorker = &pMgmt->queryWorker;
336✔
231
      break;
336✔
232
    case FETCH_QUEUE:
×
233
      pWorker = &pMgmt->fetchWorker;
×
234
      break;
×
235
    case READ_QUEUE:
174✔
236
      pWorker = &pMgmt->readWorker;
174✔
237
      break;
174✔
238
    case STATUS_QUEUE:
×
239
      pWorker = &pMgmt->statusWorker;
×
240
      break;
×
241
    case ARB_QUEUE:
20,367,530✔
242
      pWorker = &pMgmt->arbWorker;
20,367,530✔
243
      break;
20,367,530✔
244
    case SYNC_QUEUE:
46,721,549✔
245
      pWorker = &pMgmt->syncWorker;
46,721,549✔
246
      break;
46,721,549✔
247
    case SYNC_RD_QUEUE:
×
248
      pWorker = &pMgmt->syncRdWorker;
×
249
      break;
×
250
    case AUDIT_QUEUE:
×
251
      pWorker = &pMgmt->auditWorker;
×
UNCOV
252
      break;
×
253
    default:
×
254
      code = TSDB_CODE_INVALID_PARA;
×
255
  }
256

257
  if (pWorker == NULL) return code;
110,918,003✔
258

259
  SRpcMsg *pMsg;
110,849,687✔
260
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
110,918,003✔
261
  if (code) return code;
110,916,704✔
262

263
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
110,916,704✔
264
  pRpc->pCont = NULL;
110,916,704✔
265

266
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
110,918,003✔
267
         pRpc->contLen);
268
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
110,918,764✔
269
  if (code != 0) {
110,918,003✔
270
    dTrace("msg:%p, is freed", pMsg);
1,761✔
271
    rpcFreeCont(pMsg->pCont);
1,761✔
272
    taosFreeQitem(pMsg);
1,761✔
273
  }
274
  return code;
110,918,003✔
275
}
276

277
int32_t mmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
14,513,271✔
278
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
14,513,271✔
279
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
14,513,271✔
280
  *pWorkerIdx = pHeader->streamGid % tsNumOfMnodeStreamMgmtThreads;
14,513,271✔
281
  return TSDB_CODE_SUCCESS;
14,513,271✔
282
}
283

284
static int32_t mmProcessStreamFetchMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
285
  int32_t            code = 0;
×
286
  int32_t            lino = 0;
×
287
  void*              buf = NULL;
×
288
  size_t             size = 0;
×
289
  SSDataBlock*       pBlock = NULL;
×
290
  void*              taskAddr = NULL;
×
291
  SArray*            pResList = NULL;
×
292
  
293
  SResFetchReq req = {0};
×
294
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
×
295
                              TSDB_CODE_QRY_INVALID_INPUT);
296
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
×
297
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
×
298

299
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
×
300
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
×
301
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
×
302
  void* pTask = sStreamReaderCalcInfo->pTask;
×
303
  ST_TASK_DLOG("mnode %s start", __func__);
×
304

305
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
×
306
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
×
307
    int64_t uid = 0;
×
308
    if (req.dynTbname) {
×
309
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
×
310
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
311
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
312
        if (pValue != NULL && pValue->isTbname) {
×
313
          uid = pValue->uid;
×
314
          break;
×
315
        }
316
      }
317
    }
318
    
319
    SReadHandle handle = {0};
×
320
    handle.mnd = pMgmt->pMnode;
×
321
    handle.uid = uid;
×
322
    handle.pMsgCb = &pMgmt->msgCb;
×
323

324
    //initStorageAPI(&handle.api);
325

326
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
×
327
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
×
328

329
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
×
330
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, MNODE_HANDLE,
331
                                                    req.taskId));
332

333

334
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
×
335
  }
336

337
  if (req.pOpParam != NULL) {
×
338
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, (void*)req.pOpParam);
×
339
  }
340
  
341
  pResList = taosArrayInit(4, POINTER_BYTES);
×
342
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
×
343
  uint64_t ts = 0;
×
344
  bool     hasNext = false;
×
345
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, true));
×
346

347
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
×
348
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
×
349
    if (pBlock == NULL) continue;
×
350
    printDataBlock(pBlock, __func__, "streemFetch", ((SStreamTask*)pTask)->streamId);
×
351
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow && pBlock != NULL) {
×
352
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
×
353
      printDataBlock(pBlock, __func__, "fetch filter", ((SStreamTask*)pTask)->streamId);
×
354
    }
355
  }
356

357
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, TSDB_TIME_PRECISION_MILLI));
×
358
  ST_TASK_DLOG("%s end:", __func__);
×
359

360
end:
×
361
  taosArrayDestroy(pResList);
×
362
  streamReleaseTask(taskAddr);
×
363

364
  STREAM_PRINT_LOG_END(code, lino);
×
365
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
×
366
  tmsgSendRsp(&rsp);
×
367
  tDestroySResFetchReq(&req);
×
368
  return code;
×
369
}
370

371
int32_t mmProcessStreamReaderMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
372
  int32_t code = 0;
×
373
  int32_t lino = 0;
×
374
  pMsg->info.node = pMgmt->pMnode;
×
375

376
  const STraceId *trace = &pMsg->info.traceId;
×
377
  dDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
378

379
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
×
380
    TAOS_CHECK_EXIT(mmProcessStreamFetchMsg(pMgmt, pMsg));
×
381
  } else {
382
    dError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
383
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
384
  }
385

386
_exit:
×
387

388
  if (code != 0) {                                                         
×
389
    dError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
390
  }
391
  
392
  return code;
×
393
}
394

395

396
static void mmProcessStreamReaderQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
×
397
  SMnodeMgmt *pMnode = pInfo->ahandle;
×
398
  SRpcMsg   *pMsg = NULL;
×
399

400
  for (int32_t i = 0; i < numOfMsgs; ++i) {
×
401
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
×
402
    const STraceId *trace = &pMsg->info.traceId;
×
403
    dGDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
404

405
    terrno = 0;
×
406
    int32_t code = mmProcessStreamReaderMsg(pMnode, pMsg);
×
407

408
    dGDebug("msg:%p, is freed, code:0x%x [mmProcessStreamReaderQueue]", pMsg, code);
×
409
    rpcFreeCont(pMsg->pCont);
×
410
    taosFreeQitem(pMsg);
×
411
  }
412
}
×
413

414

415
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
455,759✔
416
  int32_t          code = 0;
455,759✔
417
  SSingleWorkerCfg qCfg = {
455,759✔
418
      .min = tsNumOfMnodeQueryThreads,
419
      .max = tsNumOfMnodeQueryThreads,
420
      .name = "mnode-query",
421
      .fp = (FItem)mmProcessRpcMsg,
422
      .param = pMgmt,
423
      .poolType = QUERY_AUTO_QWORKER_POOL,
424
  };
425
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
455,759✔
426
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
427
    return code;
×
428
  }
429

430
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
455,759✔
431

432
  SSingleWorkerCfg mqCfg = {
455,759✔
433
      .min = 4,
434
      .max = 4,
435
      .name = "mnode-mquery",
436
      .fp = (FItem)mmProcessRpcMsg,
437
      .param = pMgmt,
438
      .poolType = QUERY_AUTO_QWORKER_POOL,
439
  };
440
  if ((code = tSingleWorkerInit(&pMgmt->mqueryWorker, &mqCfg)) != 0) {
455,759✔
441
    dError("failed to start mnode-mquery worker since %s", tstrerror(code));
×
442
    return code;
×
443
  }
444

445
  tsNumOfQueryThreads += 4;
455,759✔
446

447
  SSingleWorkerCfg fCfg = {
455,759✔
448
      .min = tsNumOfMnodeFetchThreads,
449
      .max = tsNumOfMnodeFetchThreads,
450
      .name = "mnode-fetch",
451
      .fp = (FItem)mmProcessRpcMsg,
452
      .param = pMgmt,
453
  };
454
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
455,759✔
455
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
456
    return code;
×
457
  }
458

459
  SSingleWorkerCfg rCfg = {
455,759✔
460
      .min = tsNumOfMnodeReadThreads,
461
      .max = tsNumOfMnodeReadThreads,
462
      .name = "mnode-read",
463
      .fp = (FItem)mmProcessRpcMsg,
464
      .param = pMgmt,
465
  };
466
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
455,759✔
467
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
468
    return code;
×
469
  }
470

471
  SSingleWorkerCfg stautsCfg = {
455,759✔
472
      .min = 1,
473
      .max = 1,
474
      .name = "mnode-status",
475
      .fp = (FItem)mmProcessRpcMsg,
476
      .param = pMgmt,
477
  };
478
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
455,759✔
479
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
480
    return code;
×
481
  }
482

483
  SSingleWorkerCfg wCfg = {
455,759✔
484
      .min = 1,
485
      .max = 1,
486
      .name = "mnode-write",
487
      .fp = (FItem)mmProcessRpcMsg,
488
      .param = pMgmt,
489
  };
490
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
455,759✔
491
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
492
    return code;
×
493
  }
494

495
  SSingleWorkerCfg sCfg = {
455,759✔
496
      .min = 1,
497
      .max = 1,
498
      .name = "mnode-sync",
499
      .fp = (FItem)mmProcessSyncMsg,
500
      .param = pMgmt,
501
  };
502
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
455,759✔
503
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
504
    return code;
×
505
  }
506

507
  SSingleWorkerCfg scCfg = {
455,759✔
508
      .min = 1,
509
      .max = 1,
510
      .name = "mnode-sync-rd",
511
      .fp = (FItem)mmProcessSyncMsg,
512
      .param = pMgmt,
513
  };
514
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
455,759✔
515
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
516
    return code;
×
517
  }
518

519
  SSingleWorkerCfg arbCfg = {
455,759✔
520
      .min = 1,
521
      .max = 1,
522
      .name = "mnode-arb",
523
      .fp = (FItem)mmProcessRpcMsg,
524
      .param = pMgmt,
525
  };
526
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
455,759✔
527
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
528
    return code;
×
529
  }
530

531
  SSingleWorkerCfg auditCfg = {
455,759✔
532
      .min = 1,
533
      .max = 1,
534
      .name = "mnode-audit",
535
      .fp = (FItem)mmProcessRpcMsg,
536
      .param = pMgmt,
537
  };
538
  if ((code = tSingleWorkerInit(&pMgmt->auditWorker, &auditCfg)) != 0) {
455,759✔
539
    dError("failed to start mnode mnode-audit worker since %s", tstrerror(code));
×
540
    return code;
×
541
  }
542

543
  SDispatchWorkerPool* pPool = &pMgmt->streamMgmtWorkerPool;
455,759✔
544
  pPool->max = tsNumOfMnodeStreamMgmtThreads;
455,759✔
545
  pPool->name = "mnode-stream-mgmt";
455,759✔
546
  code = tDispatchWorkerInit(pPool);
455,759✔
547
  if (code != 0) {
455,759✔
548
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
549
    return code;
×
550
  }
551
  code = tDispatchWorkerAllocQueue(pPool, pMgmt, (FItem)mmProcessStreamHbMsg, mmDispatchStreamHbMsg);
455,759✔
552
  if (code != 0) {
455,759✔
553
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
554
    return code;
×
555
  }
556

557
  SWWorkerPool *pStreamReaderPool = &pMgmt->streamReaderPool;
455,759✔
558
  pStreamReaderPool->name = "mnode-st-reader";
455,759✔
559
  pStreamReaderPool->max = 2;
455,759✔
560
  if ((code = tWWorkerInit(pStreamReaderPool)) != 0) return code;
455,759✔
561

562
  pMgmt->pStreamReaderQ = tWWorkerAllocQueue(&pMgmt->streamReaderPool, pMgmt, mmProcessStreamReaderQueue);
455,759✔
563

564
  dDebug("mnode workers are initialized");
455,759✔
565
  return code;
455,759✔
566
}
567

568
void mmStopWorker(SMnodeMgmt *pMgmt) {
455,759✔
569
  while (pMgmt->refCount > 0) taosMsleep(10);
455,759✔
570

571
  tSingleWorkerCleanup(&pMgmt->queryWorker);
455,759✔
572
  tSingleWorkerCleanup(&pMgmt->mqueryWorker);
455,759✔
573
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
455,759✔
574
  tSingleWorkerCleanup(&pMgmt->readWorker);
455,759✔
575
  tSingleWorkerCleanup(&pMgmt->statusWorker);
455,759✔
576
  tSingleWorkerCleanup(&pMgmt->writeWorker);
455,759✔
577
  tSingleWorkerCleanup(&pMgmt->arbWorker);
455,759✔
578
  tSingleWorkerCleanup(&pMgmt->syncWorker);
455,759✔
579
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
455,759✔
580
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorkerPool);
455,759✔
581
  tWWorkerFreeQueue(&pMgmt->streamReaderPool, pMgmt->pStreamReaderQ);
455,759✔
582
  tWWorkerCleanup(&pMgmt->streamReaderPool);
455,759✔
583
  tSingleWorkerCleanup(&pMgmt->auditWorker);
455,759✔
584

585
  dDebug("mnode workers are closed");
455,759✔
586
}
455,759✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc