• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4897

25 Dec 2025 10:17AM UTC coverage: 65.717% (-0.2%) from 65.929%
#4897

push

travis-ci

web-flow
fix: [6622889291] Fix invalid rowSize. (#34043)

186011 of 283047 relevant lines covered (65.72%)

113853896.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.8
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18
#include "streamMsg.h"
19
#include "stream.h"
20
#include "streamReader.h"
21

22
#define PROCESS_THRESHOLD (2000 * 1000)
23

24
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
303,405,376✔
25
  int32_t code = 0;
303,405,376✔
26
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
303,405,376✔
27
  if (pMgmt->stopped) {
303,425,175✔
28
    code = TSDB_CODE_MNODE_STOPPED;
7,188✔
29
  } else {
30
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
303,408,473✔
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
303,419,233✔
33
  return code;
303,414,387✔
34
}
35

36
static inline void mmRelease(SMnodeMgmt *pMgmt) {
303,417,186✔
37
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
303,417,186✔
38
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
303,417,899✔
39
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
303,417,342✔
40
}
303,417,987✔
41

42
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
119,966,490✔
43
  SRpcMsg rsp = {
239,913,030✔
44
      .code = code,
45
      .pCont = pMsg->info.rsp,
119,966,490✔
46
      .contLen = pMsg->info.rspLen,
119,963,321✔
47
      .info = pMsg->info,
48
  };
49
  tmsgSendRsp(&rsp);
119,966,572✔
50
}
119,973,874✔
51

52
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
237,806,771✔
53
  SMnodeMgmt *pMgmt = pInfo->ahandle;
237,806,771✔
54
  pMsg->info.node = pMgmt->pMnode;
237,807,814✔
55

56
  const STraceId *trace = &pMsg->info.traceId;
237,809,220✔
57
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
237,809,477✔
58

59
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
237,809,477✔
60

61
  if (pInfo->timestamp != 0) {
237,808,890✔
62
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
237,803,349✔
63
    if (cost > PROCESS_THRESHOLD) {
237,805,060✔
64
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
521,395✔
65
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
66
    }
67
  }
68

69
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
237,806,649✔
70
    if (code != 0 && terrno != 0) code = terrno;
119,971,251✔
71
    mmSendRsp(pMsg, code);
119,973,662✔
72
  } else {
73
    rpcFreeCont(pMsg->info.rsp);
117,834,061✔
74
    pMsg->info.rsp = NULL;
117,835,520✔
75
  }
76

77
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
237,808,998✔
78
    mndPostProcessQueryMsg(pMsg);
5,007,682✔
79
  }
80

81
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
237,809,160✔
82
  rpcFreeCont(pMsg->pCont);
237,809,160✔
83
  taosFreeQitem(pMsg);
237,805,482✔
84
}
237,801,088✔
85

86
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
65,606,703✔
87
  SMnodeMgmt *pMgmt = pInfo->ahandle;
65,606,703✔
88
  pMsg->info.node = pMgmt->pMnode;
65,606,703✔
89

90
  const STraceId *trace = &pMsg->info.traceId;
65,606,703✔
91
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
65,606,703✔
92

93
  SMsgHead *pHead = pMsg->pCont;
65,606,703✔
94
  pHead->contLen = ntohl(pHead->contLen);
65,606,703✔
95
  pHead->vgId = ntohl(pHead->vgId);
65,606,703✔
96

97
  int32_t code = mndProcessSyncMsg(pMsg);
65,606,703✔
98

99
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
65,606,703✔
100
  rpcFreeCont(pMsg->pCont);
65,606,703✔
101
  taosFreeQitem(pMsg);
65,606,703✔
102
}
65,606,703✔
103

104
static void mmProcessStreamHbMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
14,776,750✔
105
  SMnodeMgmt *pMgmt = pInfo->ahandle;
14,776,750✔
106
  pMsg->info.node = pMgmt->pMnode;
14,776,750✔
107

108
  const STraceId *trace = &pMsg->info.traceId;
14,776,750✔
109
  dGTrace("msg:%p, get from mnode-stream-mgmt queue", pMsg);
14,776,750✔
110

111
  (void)mndProcessStreamHb(pMsg);
14,776,750✔
112

113
  dTrace("msg:%p, is freed", pMsg);
14,776,750✔
114
  rpcFreeCont(pMsg->pCont);
14,776,750✔
115
  taosFreeQitem(pMsg);
14,776,750✔
116
}
14,776,750✔
117

118

119
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
303,408,091✔
120
  const STraceId *trace = &pMsg->info.traceId;
303,408,091✔
121
  int32_t         code = 0;
303,413,646✔
122
  if ((code = mmAcquire(pMgmt)) == 0) {
303,413,646✔
123
    if(tsSyncLogHeartbeat){
303,400,945✔
124
      dGInfo("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
×
125
    }
126
    else{
127
      dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
303,400,945✔
128
    }
129
    code = taosWriteQitem(pWorker->queue, pMsg);
303,400,945✔
130
    mmRelease(pMgmt);
303,417,186✔
131
    return code;
303,417,987✔
132
  } else {
133
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
7,188✔
134
            TMSG_INFO(pMsg->msgType));
135
    return code;
7,188✔
136
  }
137
}
138

139
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
47,168,193✔
140
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
47,168,193✔
141
}
142

143
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
141,157✔
144
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
141,157✔
145
}
146

147
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
13,025,558✔
148
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
13,025,558✔
149
}
150

151
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8,640,043✔
152
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
8,640,043✔
153
}
154

155
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
70,438,967✔
156
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
70,438,967✔
157
}
158

159
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
44,301,848✔
160
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
44,301,848✔
161
}
162

163
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,266,875✔
164
  int32_t         code = 0;
5,266,875✔
165
  int32_t         qType = 0;
5,266,875✔
166
  const STraceId *trace = &pMsg->info.traceId;
5,266,875✔
167

168
  if (NULL == pMgmt->pMnode) {
5,266,875✔
169
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
170
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
171
    return code;
×
172
  }
173

174
  pMsg->info.node = pMgmt->pMnode;
5,266,875✔
175
  if ((code = mndPreProcessQueryMsg(pMsg, &qType)) != 0) {
5,266,875✔
176
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
177
            TMSG_INFO(pMsg->msgType));
178
    return code;
×
179
  }
180

181
  if (qType == TASK_TYPE_QUERY) {
5,266,875✔
182
    return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
655,522✔
183
  } else if (qType == TASK_TYPE_HQUERY) {
4,611,353✔
184
    return mmPutMsgToWorker(pMgmt, &pMgmt->mqueryWorker, pMsg);
4,611,353✔
185
  } else {
186
    code = TSDB_CODE_INVALID_PARA;
×
187
    dGError("msg:%p, invalid task qType:%d, not put into (m)query queue, type:%s", pMsg, qType,
×
188
            TMSG_INFO(pMsg->msgType));
189
    return code;
×
190
  }
191
}
192

193
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
15,481,206✔
194
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
15,481,206✔
195
}
196

197
int32_t mmPutMsgToStreamMgmtQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,776,750✔
198
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorkerPool, pMsg);
14,776,750✔
199
}
200

201
int32_t mmPutMsgToStreamReaderQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
202
  const STraceId *trace = &pMsg->info.traceId;
×
203
  int32_t         code = 0;
×
204
  if ((code = mmAcquire(pMgmt)) == 0) {
×
205
    dGDebug("msg:%p, put into %s queue, type:%s", pMsg, pMgmt->streamReaderPool.name, TMSG_INFO(pMsg->msgType));
×
206
    code = taosWriteQitem(pMgmt->pStreamReaderQ, pMsg);
×
207
    mmRelease(pMgmt);
×
208
    return code;
×
209
  } else {
210
    dGDebug("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pMgmt->streamReaderPool.name, tstrerror(code),
×
211
            TMSG_INFO(pMsg->msgType));
212
    return code;
×
213
  }
214
}
215

216

217
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
98,952,016✔
218
  int32_t code;
219

220
  SSingleWorker *pWorker = NULL;
98,952,016✔
221
  switch (qtype) {
98,952,016✔
222
    case WRITE_QUEUE:
36,733,071✔
223
      pWorker = &pMgmt->writeWorker;
36,733,071✔
224
      break;
36,733,071✔
225
    case QUERY_QUEUE:
224✔
226
      pWorker = &pMgmt->queryWorker;
224✔
227
      break;
224✔
228
    case FETCH_QUEUE:
×
229
      pWorker = &pMgmt->fetchWorker;
×
230
      break;
×
231
    case READ_QUEUE:
264✔
232
      pWorker = &pMgmt->readWorker;
264✔
233
      break;
264✔
234
    case STATUS_QUEUE:
×
235
      pWorker = &pMgmt->statusWorker;
×
236
      break;
×
237
    case ARB_QUEUE:
18,273,484✔
238
      pWorker = &pMgmt->arbWorker;
18,273,484✔
239
      break;
18,273,484✔
240
    case SYNC_QUEUE:
43,946,338✔
241
      pWorker = &pMgmt->syncWorker;
43,946,338✔
242
      break;
43,946,338✔
243
    case SYNC_RD_QUEUE:
×
244
      pWorker = &pMgmt->syncRdWorker;
×
245
      break;
×
246
    default:
×
247
      code = TSDB_CODE_INVALID_PARA;
×
248
  }
249

250
  if (pWorker == NULL) return code;
98,951,878✔
251

252
  SRpcMsg *pMsg;
98,936,978✔
253
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
98,951,878✔
254
  if (code) return code;
98,951,490✔
255

256
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
98,951,490✔
257
  pRpc->pCont = NULL;
98,951,490✔
258

259
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
98,950,866✔
260
         pRpc->contLen);
261
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
98,950,866✔
262
  if (code != 0) {
98,952,580✔
263
    dTrace("msg:%p, is freed", pMsg);
4,270✔
264
    rpcFreeCont(pMsg->pCont);
4,270✔
265
    taosFreeQitem(pMsg);
4,270✔
266
  }
267
  return code;
98,952,580✔
268
}
269

270
int32_t mmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
14,776,750✔
271
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
14,776,750✔
272
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
14,776,750✔
273
  *pWorkerIdx = pHeader->streamGid % tsNumOfMnodeStreamMgmtThreads;
14,776,750✔
274
  return TSDB_CODE_SUCCESS;
14,776,750✔
275
}
276

277
static int32_t mmProcessStreamFetchMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
278
  int32_t            code = 0;
×
279
  int32_t            lino = 0;
×
280
  void*              buf = NULL;
×
281
  size_t             size = 0;
×
282
  SSDataBlock*       pBlock = NULL;
×
283
  void*              taskAddr = NULL;
×
284
  SArray*            pResList = NULL;
×
285
  
286
  SResFetchReq req = {0};
×
287
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
×
288
                              TSDB_CODE_QRY_INVALID_INPUT);
289
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
×
290
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
×
291

292
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
×
293
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
×
294
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
×
295
  void* pTask = sStreamReaderCalcInfo->pTask;
×
296
  ST_TASK_DLOG("mnode %s start", __func__);
×
297

298
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
×
299
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
×
300
    int64_t uid = 0;
×
301
    if (req.dynTbname) {
×
302
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
×
303
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
304
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
305
        if (pValue != NULL && pValue->isTbname) {
×
306
          uid = pValue->uid;
×
307
          break;
×
308
        }
309
      }
310
    }
311
    
312
    SReadHandle handle = {0};
×
313
    handle.mnd = pMgmt->pMnode;
×
314
    handle.uid = uid;
×
315
    handle.pMsgCb = &pMgmt->msgCb;
×
316

317
    //initStorageAPI(&handle.api);
318

319
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
×
320
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
×
321

322
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
×
323
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, MNODE_HANDLE,
324
                                                    req.taskId));
325

326

327
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
×
328
  }
329

330
  if (req.pOpParam != NULL) {
×
331
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
×
332
  }
333
  
334
  pResList = taosArrayInit(4, POINTER_BYTES);
×
335
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
×
336
  uint64_t ts = 0;
×
337
  bool     hasNext = false;
×
338
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, true));
×
339

340
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
×
341
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
×
342
    if (pBlock == NULL) continue;
×
343
    printDataBlock(pBlock, __func__, "streemFetch", ((SStreamTask*)pTask)->streamId);
×
344
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow && pBlock != NULL) {
×
345
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
×
346
      printDataBlock(pBlock, __func__, "fetch filter", ((SStreamTask*)pTask)->streamId);
×
347
    }
348
  }
349

350
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, TSDB_TIME_PRECISION_MILLI));
×
351
  ST_TASK_DLOG("%s end:", __func__);
×
352

353
end:
×
354
  taosArrayDestroy(pResList);
×
355
  streamReleaseTask(taskAddr);
×
356

357
  STREAM_PRINT_LOG_END(code, lino);
×
358
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
×
359
  tmsgSendRsp(&rsp);
×
360
  tDestroySResFetchReq(&req);
×
361
  return code;
×
362
}
363

364
int32_t mmProcessStreamReaderMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
365
  int32_t code = 0;
×
366
  int32_t lino = 0;
×
367
  pMsg->info.node = pMgmt->pMnode;
×
368

369
  const STraceId *trace = &pMsg->info.traceId;
×
370
  dDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
371

372
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
×
373
    TAOS_CHECK_EXIT(mmProcessStreamFetchMsg(pMgmt, pMsg));
×
374
  } else {
375
    dError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
376
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
377
  }
378

379
_exit:
×
380

381
  if (code != 0) {                                                         
×
382
    dError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
383
  }
384
  
385
  return code;
×
386
}
387

388

389
static void mmProcessStreamReaderQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
×
390
  SMnodeMgmt *pMnode = pInfo->ahandle;
×
391
  SRpcMsg   *pMsg = NULL;
×
392

393
  for (int32_t i = 0; i < numOfMsgs; ++i) {
×
394
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
×
395
    const STraceId *trace = &pMsg->info.traceId;
×
396
    dGDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
397

398
    terrno = 0;
×
399
    int32_t code = mmProcessStreamReaderMsg(pMnode, pMsg);
×
400

401
    dGDebug("msg:%p, is freed, code:0x%x [mmProcessStreamReaderQueue]", pMsg, code);
×
402
    rpcFreeCont(pMsg->pCont);
×
403
    taosFreeQitem(pMsg);
×
404
  }
405
}
×
406

407

408
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
511,861✔
409
  int32_t          code = 0;
511,861✔
410
  SSingleWorkerCfg qCfg = {
511,861✔
411
      .min = tsNumOfMnodeQueryThreads,
412
      .max = tsNumOfMnodeQueryThreads,
413
      .name = "mnode-query",
414
      .fp = (FItem)mmProcessRpcMsg,
415
      .param = pMgmt,
416
      .poolType = QUERY_AUTO_QWORKER_POOL,
417
  };
418
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
511,861✔
419
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
420
    return code;
×
421
  }
422

423
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
511,861✔
424

425
  SSingleWorkerCfg mqCfg = {
511,861✔
426
      .min = 4,
427
      .max = 4,
428
      .name = "mnode-mquery",
429
      .fp = (FItem)mmProcessRpcMsg,
430
      .param = pMgmt,
431
      .poolType = QUERY_AUTO_QWORKER_POOL,
432
  };
433
  if ((code = tSingleWorkerInit(&pMgmt->mqueryWorker, &mqCfg)) != 0) {
511,861✔
434
    dError("failed to start mnode-mquery worker since %s", tstrerror(code));
×
435
    return code;
×
436
  }
437

438
  tsNumOfQueryThreads += 4;
511,861✔
439

440
  SSingleWorkerCfg fCfg = {
511,861✔
441
      .min = tsNumOfMnodeFetchThreads,
442
      .max = tsNumOfMnodeFetchThreads,
443
      .name = "mnode-fetch",
444
      .fp = (FItem)mmProcessRpcMsg,
445
      .param = pMgmt,
446
  };
447
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
511,861✔
448
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
449
    return code;
×
450
  }
451

452
  SSingleWorkerCfg rCfg = {
511,861✔
453
      .min = tsNumOfMnodeReadThreads,
454
      .max = tsNumOfMnodeReadThreads,
455
      .name = "mnode-read",
456
      .fp = (FItem)mmProcessRpcMsg,
457
      .param = pMgmt,
458
  };
459
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
511,861✔
460
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
461
    return code;
×
462
  }
463

464
  SSingleWorkerCfg stautsCfg = {
511,861✔
465
      .min = 1,
466
      .max = 1,
467
      .name = "mnode-status",
468
      .fp = (FItem)mmProcessRpcMsg,
469
      .param = pMgmt,
470
  };
471
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
511,861✔
472
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
473
    return code;
×
474
  }
475

476
  SSingleWorkerCfg wCfg = {
511,861✔
477
      .min = 1,
478
      .max = 1,
479
      .name = "mnode-write",
480
      .fp = (FItem)mmProcessRpcMsg,
481
      .param = pMgmt,
482
  };
483
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
511,861✔
484
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
485
    return code;
×
486
  }
487

488
  SSingleWorkerCfg sCfg = {
511,861✔
489
      .min = 1,
490
      .max = 1,
491
      .name = "mnode-sync",
492
      .fp = (FItem)mmProcessSyncMsg,
493
      .param = pMgmt,
494
  };
495
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
511,861✔
496
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
497
    return code;
×
498
  }
499

500
  SSingleWorkerCfg scCfg = {
511,861✔
501
      .min = 1,
502
      .max = 1,
503
      .name = "mnode-sync-rd",
504
      .fp = (FItem)mmProcessSyncMsg,
505
      .param = pMgmt,
506
  };
507
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
511,861✔
508
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
509
    return code;
×
510
  }
511

512
  SSingleWorkerCfg arbCfg = {
511,861✔
513
      .min = 1,
514
      .max = 1,
515
      .name = "mnode-arb",
516
      .fp = (FItem)mmProcessRpcMsg,
517
      .param = pMgmt,
518
  };
519
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
511,861✔
520
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
521
    return code;
×
522
  }
523

524
  SDispatchWorkerPool* pPool = &pMgmt->streamMgmtWorkerPool;
511,861✔
525
  pPool->max = tsNumOfMnodeStreamMgmtThreads;
511,861✔
526
  pPool->name = "mnode-stream-mgmt";
511,861✔
527
  code = tDispatchWorkerInit(pPool);
511,861✔
528
  if (code != 0) {
511,861✔
529
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
530
    return code;
×
531
  }
532
  code = tDispatchWorkerAllocQueue(pPool, pMgmt, (FItem)mmProcessStreamHbMsg, mmDispatchStreamHbMsg);
511,861✔
533
  if (code != 0) {
511,861✔
534
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
535
    return code;
×
536
  }
537

538
  SWWorkerPool *pStreamReaderPool = &pMgmt->streamReaderPool;
511,861✔
539
  pStreamReaderPool->name = "mnode-st-reader";
511,861✔
540
  pStreamReaderPool->max = 2;
511,861✔
541
  if ((code = tWWorkerInit(pStreamReaderPool)) != 0) return code;
511,861✔
542

543
  pMgmt->pStreamReaderQ = tWWorkerAllocQueue(&pMgmt->streamReaderPool, pMgmt, mmProcessStreamReaderQueue);
511,861✔
544

545
  dDebug("mnode workers are initialized");
511,861✔
546
  return code;
511,861✔
547
}
548

549
void mmStopWorker(SMnodeMgmt *pMgmt) {
511,861✔
550
  while (pMgmt->refCount > 0) taosMsleep(10);
511,861✔
551

552
  tSingleWorkerCleanup(&pMgmt->queryWorker);
511,861✔
553
  tSingleWorkerCleanup(&pMgmt->mqueryWorker);
511,861✔
554
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
511,861✔
555
  tSingleWorkerCleanup(&pMgmt->readWorker);
511,861✔
556
  tSingleWorkerCleanup(&pMgmt->statusWorker);
511,861✔
557
  tSingleWorkerCleanup(&pMgmt->writeWorker);
511,861✔
558
  tSingleWorkerCleanup(&pMgmt->arbWorker);
511,861✔
559
  tSingleWorkerCleanup(&pMgmt->syncWorker);
511,861✔
560
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
511,861✔
561
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorkerPool);
511,861✔
562
  tWWorkerFreeQueue(&pMgmt->streamReaderPool, pMgmt->pStreamReaderQ);
511,861✔
563
  tWWorkerCleanup(&pMgmt->streamReaderPool);
511,861✔
564
  
565
  dDebug("mnode workers are closed");
511,861✔
566
}
511,861✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc