• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3798

31 Mar 2025 10:39AM UTC coverage: 9.424% (-20.9%) from 30.372%
#3798

push

travis-ci

happyguoxy
test:add test cases

21549 of 307601 branches covered (7.01%)

Branch coverage included in aggregate %.

36084 of 303967 relevant lines covered (11.87%)

58620.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18

19
#define PROCESS_THRESHOLD (2000 * 1000)
20

21
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
×
22
  int32_t code = 0;
×
23
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
24
  if (pMgmt->stopped) {
×
25
    code = TSDB_CODE_MNODE_STOPPED;
×
26
  } else {
27
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
×
28
  }
29
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
30
  return code;
×
31
}
32

33
static inline void mmRelease(SMnodeMgmt *pMgmt) {
×
34
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
35
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
×
36
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
37
}
×
38

39
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
×
40
  SRpcMsg rsp = {
×
41
      .code = code,
42
      .pCont = pMsg->info.rsp,
×
43
      .contLen = pMsg->info.rspLen,
×
44
      .info = pMsg->info,
45
  };
46
  tmsgSendRsp(&rsp);
×
47
}
×
48

49
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
×
50
  SMnodeMgmt *pMgmt = pInfo->ahandle;
×
51
  pMsg->info.node = pMgmt->pMnode;
×
52

53
  const STraceId *trace = &pMsg->info.traceId;
×
54
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
55

56
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
×
57

58
  if (pInfo->timestamp != 0) {
×
59
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
×
60
    if (cost > PROCESS_THRESHOLD) {
×
61
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
×
62
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
63
    }
64
  }
65

66
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
67
    if (code != 0 && terrno != 0) code = terrno;
×
68
    mmSendRsp(pMsg, code);
×
69
  } else {
70
    rpcFreeCont(pMsg->info.rsp);
×
71
    pMsg->info.rsp = NULL;
×
72
  }
73

74
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
×
75
    mndPostProcessQueryMsg(pMsg);
×
76
  }
77

78
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
×
79
  rpcFreeCont(pMsg->pCont);
×
80
  taosFreeQitem(pMsg);
×
81
}
×
82

83
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
×
84
  SMnodeMgmt *pMgmt = pInfo->ahandle;
×
85
  pMsg->info.node = pMgmt->pMnode;
×
86

87
  const STraceId *trace = &pMsg->info.traceId;
×
88
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
×
89

90
  SMsgHead *pHead = pMsg->pCont;
×
91
  pHead->contLen = ntohl(pHead->contLen);
×
92
  pHead->vgId = ntohl(pHead->vgId);
×
93

94
  int32_t code = mndProcessSyncMsg(pMsg);
×
95

96
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
×
97
  rpcFreeCont(pMsg->pCont);
×
98
  taosFreeQitem(pMsg);
×
99
}
×
100

101
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
×
102
  const STraceId *trace = &pMsg->info.traceId;
×
103
  int32_t         code = 0;
×
104
  if ((code = mmAcquire(pMgmt)) == 0) {
×
105
    dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
×
106
    code = taosWriteQitem(pWorker->queue, pMsg);
×
107
    mmRelease(pMgmt);
×
108
    return code;
×
109
  } else {
110
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
×
111
            TMSG_INFO(pMsg->msgType));
112
    return code;
×
113
  }
114
}
115

116
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
117
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
×
118
}
119

120
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
121
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
×
122
}
123

124
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
125
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
×
126
}
127

128
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
129
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
×
130
}
131

132
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
133
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
×
134
}
135

136
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
137
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
×
138
}
139

140
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
141
  int32_t code = 0;
×
142
  if (NULL == pMgmt->pMnode) {
×
143
    const STraceId *trace = &pMsg->info.traceId;
×
144
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
145
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
146
    return code;
×
147
  }
148
  pMsg->info.node = pMgmt->pMnode;
×
149
  if ((code = mndPreProcessQueryMsg(pMsg)) != 0) {
×
150
    const STraceId *trace = &pMsg->info.traceId;
×
151
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
152
            TMSG_INFO(pMsg->msgType));
153
    return code;
×
154
  }
155
  return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
×
156
}
157

158
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
159
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
×
160
}
161

162
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
×
163
  int32_t code;
164

165
  SSingleWorker *pWorker = NULL;
×
166
  switch (qtype) {
×
167
    case WRITE_QUEUE:
×
168
      pWorker = &pMgmt->writeWorker;
×
169
      break;
×
170
    case QUERY_QUEUE:
×
171
      pWorker = &pMgmt->queryWorker;
×
172
      break;
×
173
    case FETCH_QUEUE:
×
174
      pWorker = &pMgmt->fetchWorker;
×
175
      break;
×
176
    case READ_QUEUE:
×
177
      pWorker = &pMgmt->readWorker;
×
178
      break;
×
179
    case STATUS_QUEUE:
×
180
      pWorker = &pMgmt->statusWorker;
×
181
      break;
×
182
    case ARB_QUEUE:
×
183
      pWorker = &pMgmt->arbWorker;
×
184
      break;
×
185
    case SYNC_QUEUE:
×
186
      pWorker = &pMgmt->syncWorker;
×
187
      break;
×
188
    case SYNC_RD_QUEUE:
×
189
      pWorker = &pMgmt->syncRdWorker;
×
190
      break;
×
191
    default:
×
192
      code = TSDB_CODE_INVALID_PARA;
×
193
  }
194

195
  if (pWorker == NULL) return code;
×
196

197
  SRpcMsg *pMsg;
198
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
×
199
  if (code) return code;
×
200

201
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
×
202
  pRpc->pCont = NULL;
×
203

204
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
×
205
         pRpc->contLen);
206
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
×
207
  if (code != 0) {
×
208
    dTrace("msg:%p, is freed", pMsg);
×
209
    rpcFreeCont(pMsg->pCont);
×
210
    taosFreeQitem(pMsg);
×
211
  }
212
  return code;
×
213
}
214

215
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
×
216
  int32_t          code = 0;
×
217
  SSingleWorkerCfg qCfg = {
×
218
      .min = tsNumOfMnodeQueryThreads,
219
      .max = tsNumOfMnodeQueryThreads,
220
      .name = "mnode-query",
221
      .fp = (FItem)mmProcessRpcMsg,
222
      .param = pMgmt,
223
      .poolType = QUERY_AUTO_QWORKER_POOL,
224
  };
225
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
×
226
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
227
    return code;
×
228
  }
229

230
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
×
231

232
  SSingleWorkerCfg fCfg = {
×
233
      .min = tsNumOfMnodeFetchThreads,
234
      .max = tsNumOfMnodeFetchThreads,
235
      .name = "mnode-fetch",
236
      .fp = (FItem)mmProcessRpcMsg,
237
      .param = pMgmt,
238
  };
239
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
×
240
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
241
    return code;
×
242
  }
243

244
  SSingleWorkerCfg rCfg = {
×
245
      .min = tsNumOfMnodeReadThreads,
246
      .max = tsNumOfMnodeReadThreads,
247
      .name = "mnode-read",
248
      .fp = (FItem)mmProcessRpcMsg,
249
      .param = pMgmt,
250
  };
251
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
×
252
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
253
    return code;
×
254
  }
255

256
  SSingleWorkerCfg stautsCfg = {
×
257
      .min = 1,
258
      .max = 1,
259
      .name = "mnode-status",
260
      .fp = (FItem)mmProcessRpcMsg,
261
      .param = pMgmt,
262
  };
263
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
×
264
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
265
    return code;
×
266
  }
267

268
  SSingleWorkerCfg wCfg = {
×
269
      .min = 1,
270
      .max = 1,
271
      .name = "mnode-write",
272
      .fp = (FItem)mmProcessRpcMsg,
273
      .param = pMgmt,
274
  };
275
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
×
276
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
277
    return code;
×
278
  }
279

280
  SSingleWorkerCfg sCfg = {
×
281
      .min = 1,
282
      .max = 1,
283
      .name = "mnode-sync",
284
      .fp = (FItem)mmProcessSyncMsg,
285
      .param = pMgmt,
286
  };
287
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
×
288
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
289
    return code;
×
290
  }
291

292
  SSingleWorkerCfg scCfg = {
×
293
      .min = 1,
294
      .max = 1,
295
      .name = "mnode-sync-rd",
296
      .fp = (FItem)mmProcessSyncMsg,
297
      .param = pMgmt,
298
  };
299
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
×
300
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
301
    return code;
×
302
  }
303

304
  SSingleWorkerCfg arbCfg = {
×
305
      .min = 1,
306
      .max = 1,
307
      .name = "mnode-arb",
308
      .fp = (FItem)mmProcessRpcMsg,
309
      .param = pMgmt,
310
  };
311
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
×
312
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
313
    return code;
×
314
  }
315

316
  dDebug("mnode workers are initialized");
×
317
  return code;
×
318
}
319

320
void mmStopWorker(SMnodeMgmt *pMgmt) {
×
321
  while (pMgmt->refCount > 0) taosMsleep(10);
×
322

323
  tSingleWorkerCleanup(&pMgmt->queryWorker);
×
324
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
×
325
  tSingleWorkerCleanup(&pMgmt->readWorker);
×
326
  tSingleWorkerCleanup(&pMgmt->statusWorker);
×
327
  tSingleWorkerCleanup(&pMgmt->writeWorker);
×
328
  tSingleWorkerCleanup(&pMgmt->arbWorker);
×
329
  tSingleWorkerCleanup(&pMgmt->syncWorker);
×
330
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
×
331
  dDebug("mnode workers are closed");
×
332
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc