• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5052

13 May 2026 12:00PM UTC coverage: 73.338% (-0.02%) from 73.358%
#5052

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

761 existing lines in 163 files now uncovered.

281469 of 383795 relevant lines covered (73.34%)

134502812.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.57
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18
#include "streamMsg.h"
19
#include "stream.h"
20
#include "streamReader.h"
21

22
#define PROCESS_THRESHOLD (2000 * 1000)
23

24
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
415,298,133✔
25
  int32_t code = 0;
415,298,133✔
26
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
415,298,133✔
27
  if (pMgmt->stopped) {
415,307,367✔
28
    code = TSDB_CODE_MNODE_STOPPED;
5,063✔
29
  } else {
30
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
415,295,699✔
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
415,310,240✔
33
  return code;
415,312,692✔
34
}
35

36
static inline void mmRelease(SMnodeMgmt *pMgmt) {
415,307,542✔
37
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
415,307,542✔
38
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
415,310,420✔
39
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
415,310,521✔
40
}
415,309,979✔
41

42
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
177,022,806✔
43
  SRpcMsg rsp = {
353,988,953✔
44
      .code = code,
45
      .pCont = pMsg->info.rsp,
177,023,919✔
46
      .contLen = pMsg->info.rspLen,
177,019,103✔
47
      .info = pMsg->info,
48
  };
49
  tmsgSendRsp(&rsp);
177,028,191✔
50
}
177,039,418✔
51

52
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
346,381,048✔
53
  SMnodeMgmt *pMgmt = pInfo->ahandle;
346,381,048✔
54
  pMsg->info.node = pMgmt->pMnode;
346,381,592✔
55

56
  const STraceId *trace = &pMsg->info.traceId;
346,380,252✔
57
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
346,378,417✔
58

59
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
346,379,216✔
60

61
  if (pInfo->timestamp != 0) {
346,376,007✔
62
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
346,373,928✔
63
    if (cost > PROCESS_THRESHOLD) {
346,371,537✔
64
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
712,884✔
65
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
66
    }
67
  }
68

69
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
346,368,345✔
70
    if (code != 0 && terrno != 0) code = terrno;
177,032,938✔
71
    mmSendRsp(pMsg, code);
177,032,938✔
72
  } else {
73
    rpcFreeCont(pMsg->info.rsp);
169,341,569✔
74
    pMsg->info.rsp = NULL;
169,339,983✔
75
  }
76

77
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
346,377,198✔
78
    mndPostProcessQueryMsg(pMsg);
3,618,979✔
79
  }
80

81
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
346,376,420✔
82
  rpcFreeCont(pMsg->pCont);
346,376,420✔
83
  taosFreeQitem(pMsg);
346,381,486✔
84
}
346,374,052✔
85

86
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
68,927,632✔
87
  SMnodeMgmt *pMgmt = pInfo->ahandle;
68,927,632✔
88
  pMsg->info.node = pMgmt->pMnode;
68,927,632✔
89

90
  const STraceId *trace = &pMsg->info.traceId;
68,927,632✔
91
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
68,927,632✔
92

93
  SMsgHead *pHead = pMsg->pCont;
68,927,632✔
94
  pHead->contLen = ntohl(pHead->contLen);
68,927,436✔
95
  pHead->vgId = ntohl(pHead->vgId);
68,927,632✔
96

97
  int32_t code = mndProcessSyncMsg(pMsg);
68,927,632✔
98

99
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
68,927,632✔
100
  rpcFreeCont(pMsg->pCont);
68,927,632✔
101
  taosFreeQitem(pMsg);
68,927,632✔
102
}
68,927,532✔
103

104
static void mmProcessStreamHbMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
19,911,740✔
105
  SMnodeMgmt *pMgmt = pInfo->ahandle;
19,911,740✔
106
  pMsg->info.node = pMgmt->pMnode;
19,911,740✔
107

108
  const STraceId *trace = &pMsg->info.traceId;
19,911,740✔
109
  dGTrace("msg:%p, get from mnode-stream-mgmt queue", pMsg);
19,911,740✔
110

111
  (void)mndProcessStreamHb(pMsg);
19,911,740✔
112

113
  dTrace("msg:%p, is freed", pMsg);
19,911,740✔
114
  rpcFreeCont(pMsg->pCont);
19,911,740✔
115
  taosFreeQitem(pMsg);
19,911,740✔
116
}
19,911,740✔
117

118

119
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
415,299,327✔
120
  const STraceId *trace = &pMsg->info.traceId;
415,299,327✔
121
  int32_t         code = 0;
415,302,337✔
122
  if ((code = mmAcquire(pMgmt)) == 0) {
415,302,337✔
123
    if(tsSyncLogHeartbeat){
415,291,250✔
124
      dGInfo("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
×
125
    }
126
    else{
127
      dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
415,291,250✔
128
    }
129
    code = taosWriteQitem(pWorker->queue, pMsg);
415,291,250✔
130
    mmRelease(pMgmt);
415,307,796✔
131
    return code;
415,309,653✔
132
  } else {
133
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
5,063✔
134
            TMSG_INFO(pMsg->msgType));
135
    return code;
5,063✔
136
  }
137
}
138

139
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
61,666,377✔
140
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
61,666,377✔
141
}
142

143
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
151,341✔
144
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
151,341✔
145
}
146

147
int32_t mmPutMsgToAuditQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
628✔
148
  return mmPutMsgToWorker(pMgmt, &pMgmt->auditWorker, pMsg);
628✔
149
}
150

151
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8,866,385✔
152
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
8,866,385✔
153
}
154

155
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,622,071✔
156
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
4,622,071✔
157
}
158

159
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
111,740,836✔
160
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
111,740,836✔
161
}
162

163
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
57,899,092✔
164
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
57,899,092✔
165
}
166

167
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6,569,608✔
168
  int32_t         code = 0;
6,569,608✔
169
  int32_t         qType = 0;
6,569,608✔
170
  const STraceId *trace = &pMsg->info.traceId;
6,569,608✔
171

172
  if (NULL == pMgmt->pMnode) {
6,569,608✔
173
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
174
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
175
    return code;
×
176
  }
177

178
  pMsg->info.node = pMgmt->pMnode;
6,569,608✔
179
  if ((code = mndPreProcessQueryMsg(pMsg, &qType)) != 0) {
6,569,608✔
180
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
181
            TMSG_INFO(pMsg->msgType));
182
    return code;
×
183
  }
184

185
  if (qType == TASK_TYPE_QUERY) {
6,569,608✔
186
    return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
1,392,408✔
187
  } else if (qType == TASK_TYPE_HQUERY) {
5,177,200✔
188
    return mmPutMsgToWorker(pMgmt, &pMgmt->mqueryWorker, pMsg);
5,177,200✔
189
  } else {
190
    code = TSDB_CODE_INVALID_PARA;
×
191
    dGError("msg:%p, invalid task qType:%d, not put into (m)query queue, type:%s", pMsg, qType,
×
192
            TMSG_INFO(pMsg->msgType));
193
    return code;
×
194
  }
195
}
196

197
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
18,547,216✔
198
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
18,547,216✔
199
}
200

201
int32_t mmPutMsgToStreamMgmtQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
19,911,740✔
202
  return tAddTaskIntoDispatchWorkerPool(&pMgmt->streamMgmtWorkerPool, pMsg);
19,911,740✔
203
}
204

205
int32_t mmPutMsgToStreamReaderQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
206
  const STraceId *trace = &pMsg->info.traceId;
×
207
  int32_t         code = 0;
×
208
  if ((code = mmAcquire(pMgmt)) == 0) {
×
209
    dGDebug("msg:%p, put into %s queue, type:%s", pMsg, pMgmt->streamReaderPool.name, TMSG_INFO(pMsg->msgType));
×
210
    code = taosWriteQitem(pMgmt->pStreamReaderQ, pMsg);
×
211
    mmRelease(pMgmt);
×
212
    return code;
×
213
  } else {
214
    dGDebug("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pMgmt->streamReaderPool.name, tstrerror(code),
×
215
            TMSG_INFO(pMsg->msgType));
216
    return code;
×
217
  }
218
}
219

220

221
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
145,237,787✔
222
  int32_t code;
223

224
  SSingleWorker *pWorker = NULL;
145,237,787✔
225
  switch (qtype) {
145,237,787✔
226
    case WRITE_QUEUE:
60,999,168✔
227
      pWorker = &pMgmt->writeWorker;
60,999,168✔
228
      break;
60,999,168✔
229
    case QUERY_QUEUE:
382✔
230
      pWorker = &pMgmt->queryWorker;
382✔
231
      break;
382✔
232
    case FETCH_QUEUE:
×
233
      pWorker = &pMgmt->fetchWorker;
×
234
      break;
×
235
    case READ_QUEUE:
240✔
236
      pWorker = &pMgmt->readWorker;
240✔
237
      break;
240✔
238
    case STATUS_QUEUE:
×
239
      pWorker = &pMgmt->statusWorker;
×
240
      break;
×
241
    case ARB_QUEUE:
28,795,705✔
242
      pWorker = &pMgmt->arbWorker;
28,795,705✔
243
      break;
28,795,705✔
244
    case SYNC_QUEUE:
55,442,292✔
245
      pWorker = &pMgmt->syncWorker;
55,442,292✔
246
      break;
55,442,292✔
247
    case SYNC_RD_QUEUE:
×
248
      pWorker = &pMgmt->syncRdWorker;
×
249
      break;
×
250
    case AUDIT_QUEUE:
×
251
      pWorker = &pMgmt->auditWorker;
×
UNCOV
252
      break;
×
253
    default:
×
254
      code = TSDB_CODE_INVALID_PARA;
×
255
  }
256

257
  if (pWorker == NULL) return code;
145,237,787✔
258

259
  SRpcMsg *pMsg;
145,119,967✔
260
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
145,237,787✔
261
  if (code) return code;
145,236,963✔
262

263
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
145,236,963✔
264
  pRpc->pCont = NULL;
145,236,963✔
265

266
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
145,237,787✔
267
         pRpc->contLen);
268
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
145,237,787✔
269
  if (code != 0) {
145,237,787✔
270
    dTrace("msg:%p, is freed", pMsg);
3,091✔
271
    rpcFreeCont(pMsg->pCont);
3,091✔
272
    taosFreeQitem(pMsg);
3,091✔
273
  }
274
  return code;
145,237,231✔
275
}
276

277
int32_t mmDispatchStreamHbMsg(struct SDispatchWorkerPool* pPool, void* pParam, int32_t *pWorkerIdx) {
19,911,740✔
278
  SRpcMsg* pMsg = (SRpcMsg*)pParam;
19,911,740✔
279
  SStreamMsgGrpHeader* pHeader = (SStreamMsgGrpHeader*)pMsg->pCont;
19,911,740✔
280
  *pWorkerIdx = pHeader->streamGid % tsNumOfMnodeStreamMgmtThreads;
19,911,740✔
281
  return TSDB_CODE_SUCCESS;
19,911,740✔
282
}
283

284
static int32_t mmProcessStreamFetchMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
285
  int32_t            code = 0;
×
286
  int32_t            lino = 0;
×
287
  void*              buf = NULL;
×
288
  size_t             size = 0;
×
289
  SSDataBlock*       pBlock = NULL;
×
290
  void*              taskAddr = NULL;
×
291
  SArray*            pResList = NULL;
×
292
  
293
  SResFetchReq req = {0};
×
294
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
×
295
                              TSDB_CODE_QRY_INVALID_INPUT);
296
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
×
297
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
×
298

299
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
×
300
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
×
301
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
×
302
  void* pTask = sStreamReaderCalcInfo->pTask;
×
303
  ST_TASK_DLOG("mnode %s start", __func__);
×
304

305
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
×
306
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
×
307
    int64_t uid = 0;
×
308
    if (req.dynTbname) {
×
309
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
×
310
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
311
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
312
        if (pValue != NULL && pValue->isTbname) {
×
313
          uid = pValue->uid;
×
314
          break;
×
315
        }
316
      }
317
    }
318
    
319
    SReadHandle handle = {0};
×
320
    handle.mnd = pMgmt->pMnode;
×
321
    handle.uid = uid;
×
322
    handle.pMsgCb = &pMgmt->msgCb;
×
323

324
    //initStorageAPI(&handle.api);
325

326
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
×
327
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
×
328

329
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
×
330
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, MNODE_HANDLE,
331
                                                    req.taskId));
332

333

334
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
×
335
  }
336

337
  if (req.pOpParam != NULL) {
×
338
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, (void*)req.pOpParam);
×
339
  }
340
  
341
  pResList = taosArrayInit(4, POINTER_BYTES);
×
342
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
×
343
  uint64_t ts = 0;
×
344
  bool     hasNext = false;
×
345
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, true));
×
346

347
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
×
348
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
×
349
    if (pBlock == NULL) continue;
×
350
    printDataBlock(pBlock, __func__, "streemFetch", ((SStreamTask*)pTask)->streamId);
×
351
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow && pBlock != NULL) {
×
352
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
×
353
      printDataBlock(pBlock, __func__, "fetch filter", ((SStreamTask*)pTask)->streamId);
×
354
    }
355
  }
356

357
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, TSDB_TIME_PRECISION_MILLI));
×
358
  ST_TASK_DLOG("%s end:", __func__);
×
359

360
end:
×
361
  taosArrayDestroy(pResList);
×
362
  streamReleaseTask(taskAddr);
×
363

364
  STREAM_PRINT_LOG_END(code, lino);
×
365
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
×
366
  tmsgSendRsp(&rsp);
×
367
  tDestroySResFetchReq(&req);
×
368
  return code;
×
369
}
370

371
int32_t mmProcessStreamReaderMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
×
372
  int32_t code = 0;
×
373
  int32_t lino = 0;
×
374
  pMsg->info.node = pMgmt->pMnode;
×
375

376
  const STraceId *trace = &pMsg->info.traceId;
×
377
  dDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
378

379
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
×
380
    TAOS_CHECK_EXIT(mmProcessStreamFetchMsg(pMgmt, pMsg));
×
381
  } else {
382
    dError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
383
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
384
  }
385

386
_exit:
×
387

388
  if (code != 0) {                                                         
×
389
    dError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
390
  }
391
  
392
  return code;
×
393
}
394

395

396
static void mmProcessStreamReaderQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
×
397
  SMnodeMgmt *pMnode = pInfo->ahandle;
×
398
  SRpcMsg   *pMsg = NULL;
×
399

400
  for (int32_t i = 0; i < numOfMsgs; ++i) {
×
401
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
×
402
    const STraceId *trace = &pMsg->info.traceId;
×
403
    dGDebug("msg:%p, get from mnode-stream-reader queue", pMsg);
×
404

405
    terrno = 0;
×
406
    int32_t code = mmProcessStreamReaderMsg(pMnode, pMsg);
×
407

408
    dGDebug("msg:%p, is freed, code:0x%x [mmProcessStreamReaderQueue]", pMsg, code);
×
409
    rpcFreeCont(pMsg->pCont);
×
410
    taosFreeQitem(pMsg);
×
411
  }
412
}
×
413

414

415
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
524,067✔
416
  int32_t          code = 0;
524,067✔
417
  SSingleWorkerCfg qCfg = {
524,067✔
418
      .min = tsNumOfMnodeQueryThreads,
419
      .max = tsNumOfMnodeQueryThreads,
420
      .name = "mnode-query",
421
      .fp = (FItem)mmProcessRpcMsg,
422
      .param = pMgmt,
423
      .poolType = QUERY_AUTO_QWORKER_POOL,
424
  };
425
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
524,067✔
426
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
427
    return code;
×
428
  }
429

430
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
524,067✔
431

432
  SSingleWorkerCfg mqCfg = {
524,067✔
433
      .min = 4,
434
      .max = 4,
435
      .name = "mnode-mquery",
436
      .fp = (FItem)mmProcessRpcMsg,
437
      .param = pMgmt,
438
      .poolType = QUERY_AUTO_QWORKER_POOL,
439
  };
440
  if ((code = tSingleWorkerInit(&pMgmt->mqueryWorker, &mqCfg)) != 0) {
524,067✔
441
    dError("failed to start mnode-mquery worker since %s", tstrerror(code));
×
442
    return code;
×
443
  }
444

445
  tsNumOfQueryThreads += 4;
524,067✔
446

447
  SSingleWorkerCfg fCfg = {
524,067✔
448
      .min = tsNumOfMnodeFetchThreads,
449
      .max = tsNumOfMnodeFetchThreads,
450
      .name = "mnode-fetch",
451
      .fp = (FItem)mmProcessRpcMsg,
452
      .param = pMgmt,
453
  };
454
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
524,067✔
455
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
456
    return code;
×
457
  }
458

459
  SSingleWorkerCfg rCfg = {
524,067✔
460
      .min = tsNumOfMnodeReadThreads,
461
      .max = tsNumOfMnodeReadThreads,
462
      .name = "mnode-read",
463
      .fp = (FItem)mmProcessRpcMsg,
464
      .param = pMgmt,
465
  };
466
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
524,067✔
467
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
468
    return code;
×
469
  }
470

471
  SSingleWorkerCfg stautsCfg = {
524,067✔
472
      .min = 1,
473
      .max = 1,
474
      .name = "mnode-status",
475
      .fp = (FItem)mmProcessRpcMsg,
476
      .param = pMgmt,
477
  };
478
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
524,067✔
479
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
480
    return code;
×
481
  }
482

483
  SSingleWorkerCfg wCfg = {
524,067✔
484
      .min = 1,
485
      .max = 1,
486
      .name = "mnode-write",
487
      .fp = (FItem)mmProcessRpcMsg,
488
      .param = pMgmt,
489
  };
490
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
524,067✔
491
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
492
    return code;
×
493
  }
494

495
  SSingleWorkerCfg sCfg = {
524,067✔
496
      .min = 1,
497
      .max = 1,
498
      .name = "mnode-sync",
499
      .fp = (FItem)mmProcessSyncMsg,
500
      .param = pMgmt,
501
  };
502
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
524,067✔
503
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
504
    return code;
×
505
  }
506

507
  SSingleWorkerCfg scCfg = {
524,067✔
508
      .min = 1,
509
      .max = 1,
510
      .name = "mnode-sync-rd",
511
      .fp = (FItem)mmProcessSyncMsg,
512
      .param = pMgmt,
513
  };
514
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
524,067✔
515
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
516
    return code;
×
517
  }
518

519
  SSingleWorkerCfg arbCfg = {
524,067✔
520
      .min = 1,
521
      .max = 1,
522
      .name = "mnode-arb",
523
      .fp = (FItem)mmProcessRpcMsg,
524
      .param = pMgmt,
525
  };
526
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
524,067✔
527
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
528
    return code;
×
529
  }
530

531
  SSingleWorkerCfg auditCfg = {
524,067✔
532
      .min = 1,
533
      .max = 1,
534
      .name = "mnode-audit",
535
      .fp = (FItem)mmProcessRpcMsg,
536
      .param = pMgmt,
537
  };
538
  if ((code = tSingleWorkerInit(&pMgmt->auditWorker, &auditCfg)) != 0) {
524,067✔
539
    dError("failed to start mnode mnode-audit worker since %s", tstrerror(code));
×
540
    return code;
×
541
  }
542

543
  SDispatchWorkerPool* pPool = &pMgmt->streamMgmtWorkerPool;
524,067✔
544
  pPool->max = tsNumOfMnodeStreamMgmtThreads;
524,067✔
545
  pPool->name = "mnode-stream-mgmt";
524,067✔
546
  code = tDispatchWorkerInit(pPool);
524,067✔
547
  if (code != 0) {
524,067✔
548
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
549
    return code;
×
550
  }
551
  code = tDispatchWorkerAllocQueue(pPool, pMgmt, (FItem)mmProcessStreamHbMsg, mmDispatchStreamHbMsg);
524,067✔
552
  if (code != 0) {
524,067✔
553
    dError("failed to start mnode stream-mgmt worker since %s", tstrerror(code));
×
554
    return code;
×
555
  }
556

557
  SWWorkerPool *pStreamReaderPool = &pMgmt->streamReaderPool;
524,067✔
558
  pStreamReaderPool->name = "mnode-st-reader";
524,067✔
559
  pStreamReaderPool->max = 2;
524,067✔
560
  if ((code = tWWorkerInit(pStreamReaderPool)) != 0) return code;
524,067✔
561

562
  pMgmt->pStreamReaderQ = tWWorkerAllocQueue(&pMgmt->streamReaderPool, pMgmt, mmProcessStreamReaderQueue);
524,067✔
563

564
  dDebug("mnode workers are initialized");
524,067✔
565
  return code;
524,067✔
566
}
567

568
void mmStopWorker(SMnodeMgmt *pMgmt) {
524,067✔
569
  while (pMgmt->refCount > 0) taosMsleep(10);
524,067✔
570

571
  tSingleWorkerCleanup(&pMgmt->queryWorker);
524,067✔
572
  tSingleWorkerCleanup(&pMgmt->mqueryWorker);
524,067✔
573
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
524,067✔
574
  tSingleWorkerCleanup(&pMgmt->readWorker);
524,067✔
575
  tSingleWorkerCleanup(&pMgmt->statusWorker);
524,067✔
576
  tSingleWorkerCleanup(&pMgmt->writeWorker);
524,067✔
577
  tSingleWorkerCleanup(&pMgmt->arbWorker);
524,067✔
578
  tSingleWorkerCleanup(&pMgmt->syncWorker);
524,067✔
579
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
524,067✔
580
  tDispatchWorkerCleanup(&pMgmt->streamMgmtWorkerPool);
524,067✔
581
  tWWorkerFreeQueue(&pMgmt->streamReaderPool, pMgmt->pStreamReaderQ);
524,067✔
582
  tWWorkerCleanup(&pMgmt->streamReaderPool);
524,067✔
583
  tSingleWorkerCleanup(&pMgmt->auditWorker);
524,067✔
584

585
  dDebug("mnode workers are closed");
524,067✔
586
}
524,067✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc