• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4907

30 Dec 2025 10:52AM UTC coverage: 65.541% (+0.03%) from 65.514%
#4907

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

808 existing lines in 106 files now uncovered.

193920 of 295877 relevant lines covered (65.54%)

118520209.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.88
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18
#include "streamMsg.h"
19
#include "stream.h"
20
#include "streamReader.h"
21

22
#define PROCESS_THRESHOLD (2000 * 1000)
23

24
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
333,636,428✔
25
  int32_t code = 0;
333,636,428✔
26
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
333,636,428✔
27
  if (pMgmt->stopped) {
333,644,608✔
28
    code = TSDB_CODE_MNODE_STOPPED;
1,941✔
29
  } else {
30
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
333,632,265✔
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
333,641,187✔
33
  return code;
333,640,829✔
34
}
35

36
static inline void mmRelease(SMnodeMgmt *pMgmt) {
333,638,812✔
37
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
333,638,812✔
38
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
333,642,247✔
39
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
333,642,467✔
40
}
333,642,667✔
41

42
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
142,023,249✔
43
  SRpcMsg rsp = {
284,031,980✔
44
      .code = code,
45
      .pCont = pMsg->info.rsp,
142,023,768✔
46
      .contLen = pMsg->info.rspLen,
142,021,077✔
47
      .info = pMsg->info,
48
  };
49
  tmsgSendRsp(&rsp);
142,022,338✔
50
}
142,027,331✔
51

52
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
281,933,070✔
53
  SMnodeMgmt *pMgmt = pInfo->ahandle;
281,933,070✔
54
  pMsg->info.node = pMgmt->pMnode;
281,932,406✔
55

56
  const STraceId *trace = &pMsg->info.traceId;
281,932,130✔
57
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
281,931,332✔
58

59
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
281,931,332✔
60

61
  if (pInfo->timestamp != 0) {
281,930,633✔
62
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
281,932,295✔
63
    if (cost > PROCESS_THRESHOLD) {
281,929,849✔
64
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
1,780,815✔
65
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
66
    }
67
  }
68

69
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
281,930,153✔
70
    if (code != 0 && terrno != 0) code = terrno;
142,028,905✔
71
    mmSendRsp(pMsg, code);
142,028,778✔
72
  } else {
73
    rpcFreeCont(pMsg->info.rsp);
139,898,658✔
74
    pMsg->info.rsp = NULL;
139,903,279✔
75
  }
76

77
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
281,930,521✔
78
    mndPostProcessQueryMsg(pMsg);
1,860,324✔
79
  }
80

81
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
281,927,073✔
82
  rpcFreeCont(pMsg->pCont);
281,927,073✔
83
  taosFreeQitem(pMsg);
281,930,211✔
84
}
281,928,644✔
85

86
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
51,707,735✔
87
  SMnodeMgmt *pMgmt = pInfo->ahandle;
51,707,735✔
88
  pMsg->info.node = pMgmt->pMnode;
51,707,735✔
89

90
  const STraceId *trace = &pMsg->info.traceId;
51,707,735✔
91
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
51,707,735✔
92

93
  SMsgHead *pHead = pMsg->pCont;
51,707,735✔
94
  pHead->contLen = ntohl(pHead->contLen);
51,707,735✔
95
  pHead->vgId = ntohl(pHead->vgId);
51,707,654✔
96

97
  int32_t code = mndProcessSyncMsg(pMsg);
51,707,654✔
98

99
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
51,707,735✔
100
  rpcFreeCont(pMsg->pCont);
51,707,735✔
101
  taosFreeQitem(pMsg);
51,707,634✔
102
}
51,707,360✔
103

104
static void mmProcessStreamHbMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
18,405,301✔
105
  SMnodeMgmt *pMgmt = pInfo->ahandle;
18,405,301✔
106
  pMsg->info.node = pMgmt->pMnode;
18,405,301✔
107

108
  const STraceId *trace = &pMsg->info.traceId;
18,405,301✔
109
  dGTrace("msg:%p, get from mnode-stream-mgmt queue", pMsg);
18,405,301✔
110

111
  (void)mndProcessStreamHb(pMsg);
18,405,301✔
112

113
  dTrace("msg:%p, is freed", pMsg);
18,405,301✔
114
  rpcFreeCont(pMsg->pCont);
18,405,301✔
115
  taosFreeQitem(pMsg);
18,405,301✔
116
}
18,405,301✔
117

118

119
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
333,634,362✔
120
  const STraceId *trace = &pMsg->info.traceId;
333,634,362✔
121
  int32_t         code = 0;
333,636,913✔
122
  if ((code = mmAcquire(pMgmt)) == 0) {
333,636,913✔
123
    if(tsSyncLogHeartbeat){
333,635,106✔
124
      dGInfo("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
×
125
    }
126
    else{
127
      dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
333,635,106✔
128
    }
129
    code = taosWriteQitem(pWorker->queue, pMsg);
333,635,106✔
130
    mmRelease(pMgmt);
333,640,170✔
131
    return code;
333,642,128✔
132
  } else {
133
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
1,941✔
134
            TMSG_INFO(pMsg->msgType));
135
    return code;
1,941✔
136
  }
137
}
138

139
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
46,976,491✔
140
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
46,976,491✔
141
}
142

143
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
164,151✔
144
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
164,151✔
145
}
146

147
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,585,341✔
148
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
5,585,341✔
149
}
150

151
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,651,991✔
152
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
4,651,991✔
153
}
154

155
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
81,780,709✔
156
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
81,780,709✔
157
}
158

159
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
54,377,161✔
160
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
54,377,161✔
161
}
162

163
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,379,713✔
164
  int32_t         code = 0;
4,379,713✔
165
  int32_t         qType = 0;
4,379,713✔
166
  const STraceId *trace = &pMsg->info.traceId;
4,379,713✔
167

168
  if (NULL == pMgmt->pMnode) {
4,379,713✔
169
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
170
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
171
    return code;
×
172
  }
173

174
  pMsg->info.node = pMgmt->pMnode;
4,379,713✔
175
  if ((code = mndPreProcessQueryMsg(pMsg, &qType)) != 0) {
4,379,713✔
176
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
177
            TMSG_INFO(pMsg->msgType));
178
    return code;
×
179
  }
180

181
  if (qType == TASK_TYPE_QUERY) {
4,379,713✔
182
    return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
1,025,477✔
183
  } else if (qType == TASK_TYPE_HQUERY) {
3,354,236✔
184
    return mmPutMsgToWorker(pMgmt, &pMgmt->mqueryWorker, pMsg);
3,354,236✔
185
  } else {
186
    code = TSDB_CODE_INVALID_PARA;
×
187
    dGError("msg:%p, invalid task qType:%d, not put into (m)query queue, type:%s", pMsg, qType,
×
188
            TMSG_INFO(pMsg->msgType));
189
    return code;
×
190
  }
191
}
192

193
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
12,373,554✔
194
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
12,373,554✔
195
}
196

197
int32_t mmPutMsgToStreamMgmtQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
18,405,065✔
198
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorkerPool, pMsg);
18,405,065✔
199
}
200

201
int32_t mmPutMsgToStreamReaderQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
202
  const STraceId *trace = &pMsg->info.traceId;
×
203
  int32_t         code = 0;
×
204
  if ((code = mmAcquire(pMgmt)) == 0) {
×
205
    dGDebug("msg:%p, put into %s queue, type:%s", pMsg, pMgmt->streamReaderPool.name, TMSG_INFO(pMsg->msgType));
×
206
    code = taosWriteQitem(pMgmt->pStreamReaderQ, pMsg);
×
207
    mmRelease(pMgmt);
×
208
    return code;
×
209
  } else {
210
    dGDebug("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pMgmt->streamReaderPool.name, tstrerror(code),
×
211
            TMSG_INFO(pMsg->msgType));
212
    return code;
×
213
  }
214
}
215

216

217
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
123,348,407✔
218
  int32_t code;
219

220
  SSingleWorker *pWorker = NULL;
123,348,407✔
221
  switch (qtype) {
123,348,407✔
222
    case WRITE_QUEUE:
54,452,517✔
223
      pWorker = &pMgmt->writeWorker;
54,452,517✔
224
      break;
54,452,517✔
225
    case QUERY_QUEUE:
×
226
      pWorker = &pMgmt->queryWorker;
×
227
      break;
×
228
    case FETCH_QUEUE:
×
229
      pWorker = &pMgmt->fetchWorker;
×
230
      break;
×
231
    case READ_QUEUE:
100✔
232
      pWorker = &pMgmt->readWorker;
100✔
233
      break;
100✔
234
    case STATUS_QUEUE:
×
235
      pWorker = &pMgmt->statusWorker;
×
236
      break;
×
237
    case ARB_QUEUE:
27,423,950✔
238
      pWorker = &pMgmt->arbWorker;
27,423,950✔
239
      break;
27,423,950✔
240
    case SYNC_QUEUE:
41,472,177✔
241
      pWorker = &pMgmt->syncWorker;
41,472,177✔
242
      break;
41,472,177✔
243
    case SYNC_RD_QUEUE:
×
244
      pWorker = &pMgmt->syncRdWorker;
×
UNCOV
245
      break;
×
246
    default:
×
247
      code = TSDB_CODE_INVALID_PARA;
×
248
  }
249

250
  if (pWorker == NULL) return code;
123,348,079✔
251

252
  SRpcMsg *pMsg;
123,333,713✔
253
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
123,348,079✔
254
  if (code) return code;
123,347,717✔
255

256
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
123,347,717✔
257
  pRpc->pCont = NULL;
123,347,717✔
258

259
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
123,347,793✔
260
         pRpc->contLen);
261
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
123,348,134✔
262
  if (code != 0) {
123,347,514✔
263
    dTrace("msg:%p, is freed", pMsg);
1,069✔
264
    rpcFreeCont(pMsg->pCont);
1,069✔
265
    taosFreeQitem(pMsg);
1,069✔
266
  }
267
  return code;
123,347,514✔
268
}
269

270
int32_t mmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
18,405,301✔
271
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
18,405,301✔
272
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
18,405,301✔
273
  *pWorkerIdx = pHeader->streamGid % tsNumOfMnodeStreamMgmtThreads;
18,405,301✔
274
  return TSDB_CODE_SUCCESS;
18,405,301✔
275
}
276

277
static int32_t mmProcessStreamFetchMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
278
  int32_t            code = 0;
×
279
  int32_t            lino = 0;
×
280
  void*              buf = NULL;
×
281
  size_t             size = 0;
×
282
  SSDataBlock*       pBlock = NULL;
×
283
  void*              taskAddr = NULL;
×
284
  SArray*            pResList = NULL;
×
285
  
286
  SResFetchReq req = {0};
×
287
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
×
288
                              TSDB_CODE_QRY_INVALID_INPUT);
289
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
×
290
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
×
291

292
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
×
293
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
×
294
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
×
295
  void* pTask = sStreamReaderCalcInfo->pTask;
×
296
  ST_TASK_DLOG("mnode %s start", __func__);
×
297

298
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
×
299
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
×
300
    int64_t uid = 0;
×
301
    if (req.dynTbname) {
×
302
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
×
303
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
304
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
305
        if (pValue != NULL && pValue->isTbname) {
×
306
          uid = pValue->uid;
×
307
          break;
×
308
        }
309
      }
310
    }
311
    
312
    SReadHandle handle = {0};
×
313
    handle.mnd = pMgmt->pMnode;
×
314
    handle.uid = uid;
×
315
    handle.pMsgCb = &pMgmt->msgCb;
×
316

317
    //initStorageAPI(&handle.api);
318

319
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
×
320
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
×
321

322
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
×
323
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, MNODE_HANDLE,
324
                                                    req.taskId));
325

326

327
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
×
328
  }
329

330
  if (req.pOpParam != NULL) {
×
331
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
×
332
  }
333
  
334
  pResList = taosArrayInit(4, POINTER_BYTES);
×
335
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
×
336
  uint64_t ts = 0;
×
337
  bool     hasNext = false;
×
338
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, true));
×
339

340
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
×
341
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
×
342
    if (pBlock == NULL) continue;
×
343
    printDataBlock(pBlock, __func__, "streemFetch", ((SStreamTask*)pTask)->streamId);
×
344
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow && pBlock != NULL) {
×
345
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
×
346
      printDataBlock(pBlock, __func__, "fetch filter", ((SStreamTask*)pTask)->streamId);
×
347
    }
348
  }
349

350
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, TSDB_TIME_PRECISION_MILLI));
×
351
  ST_TASK_DLOG("%s end:", __func__);
×
352

353
end:
×
354
  taosArrayDestroy(pResList);
×
355
  streamReleaseTask(taskAddr);
×
356

357
  STREAM_PRINT_LOG_END(code, lino);
×
358
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
×
359
  tmsgSendRsp(&rsp);
×
360
  tDestroySResFetchReq(&req);
×
361
  return code;
×
362
}
363

364
int32_t mmProcessStreamReaderMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
365
  int32_t code = 0;
×
366
  int32_t lino = 0;
×
367
  pMsg->info.node = pMgmt->pMnode;
×
368

369
  const STraceId *trace = &pMsg->info.traceId;
×
370
  dDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
371

372
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
×
373
    TAOS_CHECK_EXIT(mmProcessStreamFetchMsg(pMgmt, pMsg));
×
374
  } else {
375
    dError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
376
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
377
  }
378

379
_exit:
×
380

381
  if (code != 0) {                                                         
×
382
    dError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
383
  }
384
  
385
  return code;
×
386
}
387

388

389
static void mmProcessStreamReaderQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
×
390
  SMnodeMgmt *pMnode = pInfo->ahandle;
×
391
  SRpcMsg   *pMsg = NULL;
×
392

393
  for (int32_t i = 0; i < numOfMsgs; ++i) {
×
394
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
×
395
    const STraceId *trace = &pMsg->info.traceId;
×
396
    dGDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
397

398
    terrno = 0;
×
399
    int32_t code = mmProcessStreamReaderMsg(pMnode, pMsg);
×
400

401
    dGDebug("msg:%p, is freed, code:0x%x [mmProcessStreamReaderQueue]", pMsg, code);
×
402
    rpcFreeCont(pMsg->pCont);
×
403
    taosFreeQitem(pMsg);
×
404
  }
405
}
×
406

407

408
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
383,437✔
409
  int32_t          code = 0;
383,437✔
410
  SSingleWorkerCfg qCfg = {
383,437✔
411
      .min = tsNumOfMnodeQueryThreads,
412
      .max = tsNumOfMnodeQueryThreads,
413
      .name = "mnode-query",
414
      .fp = (FItem)mmProcessRpcMsg,
415
      .param = pMgmt,
416
      .poolType = QUERY_AUTO_QWORKER_POOL,
417
  };
418
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
383,437✔
419
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
420
    return code;
×
421
  }
422

423
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
383,437✔
424

425
  SSingleWorkerCfg mqCfg = {
383,437✔
426
      .min = 4,
427
      .max = 4,
428
      .name = "mnode-mquery",
429
      .fp = (FItem)mmProcessRpcMsg,
430
      .param = pMgmt,
431
      .poolType = QUERY_AUTO_QWORKER_POOL,
432
  };
433
  if ((code = tSingleWorkerInit(&pMgmt->mqueryWorker, &mqCfg)) != 0) {
383,437✔
434
    dError("failed to start mnode-mquery worker since %s", tstrerror(code));
×
435
    return code;
×
436
  }
437

438
  tsNumOfQueryThreads += 4;
383,437✔
439

440
  SSingleWorkerCfg fCfg = {
383,437✔
441
      .min = tsNumOfMnodeFetchThreads,
442
      .max = tsNumOfMnodeFetchThreads,
443
      .name = "mnode-fetch",
444
      .fp = (FItem)mmProcessRpcMsg,
445
      .param = pMgmt,
446
  };
447
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
383,437✔
448
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
449
    return code;
×
450
  }
451

452
  SSingleWorkerCfg rCfg = {
383,437✔
453
      .min = tsNumOfMnodeReadThreads,
454
      .max = tsNumOfMnodeReadThreads,
455
      .name = "mnode-read",
456
      .fp = (FItem)mmProcessRpcMsg,
457
      .param = pMgmt,
458
  };
459
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
383,437✔
460
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
461
    return code;
×
462
  }
463

464
  SSingleWorkerCfg stautsCfg = {
383,437✔
465
      .min = 1,
466
      .max = 1,
467
      .name = "mnode-status",
468
      .fp = (FItem)mmProcessRpcMsg,
469
      .param = pMgmt,
470
  };
471
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
383,437✔
472
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
473
    return code;
×
474
  }
475

476
  SSingleWorkerCfg wCfg = {
383,437✔
477
      .min = 1,
478
      .max = 1,
479
      .name = "mnode-write",
480
      .fp = (FItem)mmProcessRpcMsg,
481
      .param = pMgmt,
482
  };
483
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
383,437✔
484
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
485
    return code;
×
486
  }
487

488
  SSingleWorkerCfg sCfg = {
383,437✔
489
      .min = 1,
490
      .max = 1,
491
      .name = "mnode-sync",
492
      .fp = (FItem)mmProcessSyncMsg,
493
      .param = pMgmt,
494
  };
495
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
383,437✔
496
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
497
    return code;
×
498
  }
499

500
  SSingleWorkerCfg scCfg = {
383,437✔
501
      .min = 1,
502
      .max = 1,
503
      .name = "mnode-sync-rd",
504
      .fp = (FItem)mmProcessSyncMsg,
505
      .param = pMgmt,
506
  };
507
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
383,437✔
508
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
509
    return code;
×
510
  }
511

512
  SSingleWorkerCfg arbCfg = {
383,437✔
513
      .min = 1,
514
      .max = 1,
515
      .name = "mnode-arb",
516
      .fp = (FItem)mmProcessRpcMsg,
517
      .param = pMgmt,
518
  };
519
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
383,437✔
520
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
521
    return code;
×
522
  }
523

524
  SDispatchWorkerPool* pPool = &pMgmt->streamMgmtWorkerPool;
383,437✔
525
  pPool->max = tsNumOfMnodeStreamMgmtThreads;
383,437✔
526
  pPool->name = "mnode-stream-mgmt";
383,437✔
527
  code = tDispatchWorkerInit(pPool);
383,437✔
528
  if (code != 0) {
383,437✔
529
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
530
    return code;
×
531
  }
532
  code = tDispatchWorkerAllocQueue(pPool, pMgmt, (FItem)mmProcessStreamHbMsg, mmDispatchStreamHbMsg);
383,437✔
533
  if (code != 0) {
383,437✔
534
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
535
    return code;
×
536
  }
537

538
  SWWorkerPool *pStreamReaderPool = &pMgmt->streamReaderPool;
383,437✔
539
  pStreamReaderPool->name = "mnode-st-reader";
383,437✔
540
  pStreamReaderPool->max = 2;
383,437✔
541
  if ((code = tWWorkerInit(pStreamReaderPool)) != 0) return code;
383,437✔
542

543
  pMgmt->pStreamReaderQ = tWWorkerAllocQueue(&pMgmt->streamReaderPool, pMgmt, mmProcessStreamReaderQueue);
383,437✔
544

545
  dDebug("mnode workers are initialized");
383,437✔
546
  return code;
383,437✔
547
}
548

549
void mmStopWorker(SMnodeMgmt *pMgmt) {
383,437✔
550
  while (pMgmt->refCount > 0) taosMsleep(10);
383,437✔
551

552
  tSingleWorkerCleanup(&pMgmt->queryWorker);
383,437✔
553
  tSingleWorkerCleanup(&pMgmt->mqueryWorker);
383,437✔
554
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
383,437✔
555
  tSingleWorkerCleanup(&pMgmt->readWorker);
383,437✔
556
  tSingleWorkerCleanup(&pMgmt->statusWorker);
383,437✔
557
  tSingleWorkerCleanup(&pMgmt->writeWorker);
383,437✔
558
  tSingleWorkerCleanup(&pMgmt->arbWorker);
383,437✔
559
  tSingleWorkerCleanup(&pMgmt->syncWorker);
383,437✔
560
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
383,437✔
561
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorkerPool);
383,437✔
562
  tWWorkerFreeQueue(&pMgmt->streamReaderPool, pMgmt->pStreamReaderQ);
383,437✔
563
  tWWorkerCleanup(&pMgmt->streamReaderPool);
383,437✔
564
  
565
  dDebug("mnode workers are closed");
383,437✔
566
}
383,437✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc