• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4131

20 May 2025 07:22AM UTC coverage: 63.096% (+0.7%) from 62.384%
#4131

push

travis-ci

web-flow
docs(datain): add topic meta options docs in tmq (#31147)

157751 of 318088 branches covered (49.59%)

Branch coverage included in aggregate %.

243052 of 317143 relevant lines covered (76.64%)

18743283.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.04
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18

19
#define PROCESS_THRESHOLD (2000 * 1000)
20

21
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
4,373,932✔
22
  int32_t code = 0;
4,373,932✔
23
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
4,373,932✔
24
  if (pMgmt->stopped) {
4,375,789✔
25
    code = TSDB_CODE_MNODE_STOPPED;
11✔
26
  } else {
27
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
4,375,778✔
28
  }
29
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
4,375,719✔
30
  return code;
4,375,560✔
31
}
32

33
static inline void mmRelease(SMnodeMgmt *pMgmt) {
4,375,577✔
34
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
4,375,577✔
35
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
4,375,952✔
36
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
4,375,949✔
37
}
4,375,882✔
38

39
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
2,880,178✔
40
  SRpcMsg rsp = {
2,880,178✔
41
      .code = code,
42
      .pCont = pMsg->info.rsp,
2,880,178✔
43
      .contLen = pMsg->info.rspLen,
2,880,178✔
44
      .info = pMsg->info,
45
  };
46
  tmsgSendRsp(&rsp);
2,880,178✔
47
}
2,880,401✔
48

49
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
4,183,752✔
50
  SMnodeMgmt *pMgmt = pInfo->ahandle;
4,183,752✔
51
  pMsg->info.node = pMgmt->pMnode;
4,183,752✔
52

53
  const STraceId *trace = &pMsg->info.traceId;
4,183,752✔
54
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
4,183,752!
55

56
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
4,183,754✔
57

58
  if (pInfo->timestamp != 0) {
4,184,153!
59
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
4,184,273✔
60
    if (cost > PROCESS_THRESHOLD) {
4,184,273✔
61
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
1,162!
62
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
63
    }
64
  }
65

66
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4,184,025✔
67
    if (code != 0 && terrno != 0) code = terrno;
2,880,327!
68
    mmSendRsp(pMsg, code);
2,880,327✔
69
  } else {
70
    rpcFreeCont(pMsg->info.rsp);
1,303,698✔
71
    pMsg->info.rsp = NULL;
1,303,966✔
72
  }
73

74
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
4,184,345✔
75
    mndPostProcessQueryMsg(pMsg);
25,382✔
76
  }
77

78
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
4,184,252!
79
  rpcFreeCont(pMsg->pCont);
4,184,252✔
80
  taosFreeQitem(pMsg);
4,184,693✔
81
}
4,184,735✔
82

83
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
191,256✔
84
  SMnodeMgmt *pMgmt = pInfo->ahandle;
191,256✔
85
  pMsg->info.node = pMgmt->pMnode;
191,256✔
86

87
  const STraceId *trace = &pMsg->info.traceId;
191,256✔
88
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
191,256!
89

90
  SMsgHead *pHead = pMsg->pCont;
191,256✔
91
  pHead->contLen = ntohl(pHead->contLen);
191,256✔
92
  pHead->vgId = ntohl(pHead->vgId);
191,256✔
93

94
  int32_t code = mndProcessSyncMsg(pMsg);
191,256✔
95

96
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
191,256!
97
  rpcFreeCont(pMsg->pCont);
191,256✔
98
  taosFreeQitem(pMsg);
191,235✔
99
}
191,229✔
100

101
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
4,374,168✔
102
  const STraceId *trace = &pMsg->info.traceId;
4,374,168✔
103
  int32_t         code = 0;
4,374,168✔
104
  if ((code = mmAcquire(pMgmt)) == 0) {
4,374,168✔
105
    dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
4,375,514!
106
    code = taosWriteQitem(pWorker->queue, pMsg);
4,375,514✔
107
    mmRelease(pMgmt);
4,375,640✔
108
    return code;
4,375,704✔
109
  } else {
110
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
11!
111
            TMSG_INFO(pMsg->msgType));
112
    return code;
11✔
113
  }
114
}
115

116
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
231,547✔
117
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
231,547✔
118
}
119

120
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
101✔
121
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
101✔
122
}
123

124
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
40,001✔
125
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
40,001✔
126
}
127

128
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
13,664✔
129
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
13,664✔
130
}
131

132
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,739,799✔
133
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
2,739,799✔
134
}
135

136
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
145,199✔
137
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
145,199✔
138
}
139

140
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
248,802✔
141
  int32_t code = 0;
248,802✔
142
  if (NULL == pMgmt->pMnode) {
248,802!
143
    const STraceId *trace = &pMsg->info.traceId;
×
144
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
145
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
146
    return code;
×
147
  }
148
  pMsg->info.node = pMgmt->pMnode;
248,802✔
149
  if ((code = mndPreProcessQueryMsg(pMsg)) != 0) {
248,802!
150
    const STraceId *trace = &pMsg->info.traceId;
×
151
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
152
            TMSG_INFO(pMsg->msgType));
153
    return code;
×
154
  }
155
  return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
248,872✔
156
}
157

158
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
643,108✔
159
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
643,108✔
160
}
161

162
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
312,027✔
163
  int32_t code;
164

165
  SSingleWorker *pWorker = NULL;
312,027✔
166
  switch (qtype) {
312,027!
167
    case WRITE_QUEUE:
110,281✔
168
      pWorker = &pMgmt->writeWorker;
110,281✔
169
      break;
110,281✔
170
    case QUERY_QUEUE:
2✔
171
      pWorker = &pMgmt->queryWorker;
2✔
172
      break;
2✔
173
    case FETCH_QUEUE:
×
174
      pWorker = &pMgmt->fetchWorker;
×
175
      break;
×
176
    case READ_QUEUE:
125✔
177
      pWorker = &pMgmt->readWorker;
125✔
178
      break;
125✔
179
    case STATUS_QUEUE:
×
180
      pWorker = &pMgmt->statusWorker;
×
181
      break;
×
182
    case ARB_QUEUE:
64,024✔
183
      pWorker = &pMgmt->arbWorker;
64,024✔
184
      break;
64,024✔
185
    case SYNC_QUEUE:
137,595✔
186
      pWorker = &pMgmt->syncWorker;
137,595✔
187
      break;
137,595✔
188
    case SYNC_RD_QUEUE:
×
189
      pWorker = &pMgmt->syncRdWorker;
×
190
      break;
×
191
    default:
×
192
      code = TSDB_CODE_INVALID_PARA;
×
193
  }
194

195
  if (pWorker == NULL) return code;
312,027!
196

197
  SRpcMsg *pMsg;
198
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
312,027✔
199
  if (code) return code;
312,027!
200

201
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
312,027✔
202
  pRpc->pCont = NULL;
312,027✔
203

204
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
312,027!
205
         pRpc->contLen);
206
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
312,027✔
207
  if (code != 0) {
312,027✔
208
    dTrace("msg:%p, is freed", pMsg);
7!
209
    rpcFreeCont(pMsg->pCont);
7✔
210
    taosFreeQitem(pMsg);
7✔
211
  }
212
  return code;
312,027✔
213
}
214

215
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
2,193✔
216
  int32_t          code = 0;
2,193✔
217
  SSingleWorkerCfg qCfg = {
2,193✔
218
      .min = tsNumOfMnodeQueryThreads,
219
      .max = tsNumOfMnodeQueryThreads,
220
      .name = "mnode-query",
221
      .fp = (FItem)mmProcessRpcMsg,
222
      .param = pMgmt,
223
      .poolType = QUERY_AUTO_QWORKER_POOL,
224
  };
225
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
2,193!
226
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
227
    return code;
×
228
  }
229

230
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
2,193✔
231

232
  SSingleWorkerCfg fCfg = {
2,193✔
233
      .min = tsNumOfMnodeFetchThreads,
234
      .max = tsNumOfMnodeFetchThreads,
235
      .name = "mnode-fetch",
236
      .fp = (FItem)mmProcessRpcMsg,
237
      .param = pMgmt,
238
  };
239
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
2,193!
240
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
241
    return code;
×
242
  }
243

244
  SSingleWorkerCfg rCfg = {
2,193✔
245
      .min = tsNumOfMnodeReadThreads,
246
      .max = tsNumOfMnodeReadThreads,
247
      .name = "mnode-read",
248
      .fp = (FItem)mmProcessRpcMsg,
249
      .param = pMgmt,
250
  };
251
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
2,193!
252
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
253
    return code;
×
254
  }
255

256
  SSingleWorkerCfg stautsCfg = {
2,193✔
257
      .min = 1,
258
      .max = 1,
259
      .name = "mnode-status",
260
      .fp = (FItem)mmProcessRpcMsg,
261
      .param = pMgmt,
262
  };
263
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
2,193!
264
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
265
    return code;
×
266
  }
267

268
  SSingleWorkerCfg wCfg = {
2,193✔
269
      .min = 1,
270
      .max = 1,
271
      .name = "mnode-write",
272
      .fp = (FItem)mmProcessRpcMsg,
273
      .param = pMgmt,
274
  };
275
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
2,193!
276
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
277
    return code;
×
278
  }
279

280
  SSingleWorkerCfg sCfg = {
2,193✔
281
      .min = 1,
282
      .max = 1,
283
      .name = "mnode-sync",
284
      .fp = (FItem)mmProcessSyncMsg,
285
      .param = pMgmt,
286
  };
287
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
2,193!
288
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
289
    return code;
×
290
  }
291

292
  SSingleWorkerCfg scCfg = {
2,193✔
293
      .min = 1,
294
      .max = 1,
295
      .name = "mnode-sync-rd",
296
      .fp = (FItem)mmProcessSyncMsg,
297
      .param = pMgmt,
298
  };
299
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
2,193!
300
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
301
    return code;
×
302
  }
303

304
  SSingleWorkerCfg arbCfg = {
2,193✔
305
      .min = 1,
306
      .max = 1,
307
      .name = "mnode-arb",
308
      .fp = (FItem)mmProcessRpcMsg,
309
      .param = pMgmt,
310
  };
311
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
2,193!
312
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
313
    return code;
×
314
  }
315

316
  dDebug("mnode workers are initialized");
2,193✔
317
  return code;
2,193✔
318
}
319

320
void mmStopWorker(SMnodeMgmt *pMgmt) {
2,193✔
321
  while (pMgmt->refCount > 0) taosMsleep(10);
2,193!
322

323
  tSingleWorkerCleanup(&pMgmt->queryWorker);
2,193✔
324
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
2,193✔
325
  tSingleWorkerCleanup(&pMgmt->readWorker);
2,193✔
326
  tSingleWorkerCleanup(&pMgmt->statusWorker);
2,193✔
327
  tSingleWorkerCleanup(&pMgmt->writeWorker);
2,193✔
328
  tSingleWorkerCleanup(&pMgmt->arbWorker);
2,193✔
329
  tSingleWorkerCleanup(&pMgmt->syncWorker);
2,193✔
330
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
2,193✔
331
  dDebug("mnode workers are closed");
2,193✔
332
}
2,193✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc