• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3818

01 Apr 2025 07:46AM UTC coverage: 34.065% (-0.02%) from 34.08%
#3818

push

travis-ci

happyguoxy
test:alter gcda dir

148531 of 599532 branches covered (24.77%)

Branch coverage included in aggregate %.

222425 of 489446 relevant lines covered (45.44%)

762721.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.17
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18

19
#define PROCESS_THRESHOLD (2000 * 1000)
20

21
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
23,262✔
22
  int32_t code = 0;
23,262✔
23
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
23,262✔
24
  if (pMgmt->stopped) {
23,263!
25
    code = TSDB_CODE_MNODE_STOPPED;
×
26
  } else {
27
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
23,263✔
28
  }
29
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
23,263✔
30
  return code;
23,263✔
31
}
32

33
static inline void mmRelease(SMnodeMgmt *pMgmt) {
23,264✔
34
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
23,264✔
35
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
23,264✔
36
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
23,264✔
37
}
23,264✔
38

39
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
9,548✔
40
  SRpcMsg rsp = {
9,548✔
41
      .code = code,
42
      .pCont = pMsg->info.rsp,
9,548✔
43
      .contLen = pMsg->info.rspLen,
9,548✔
44
      .info = pMsg->info,
45
  };
46
  tmsgSendRsp(&rsp);
9,548✔
47
}
9,549✔
48

49
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
17,630✔
50
  SMnodeMgmt *pMgmt = pInfo->ahandle;
17,630✔
51
  pMsg->info.node = pMgmt->pMnode;
17,630✔
52

53
  const STraceId *trace = &pMsg->info.traceId;
17,630✔
54
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
17,630!
55

56
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
17,630✔
57

58
  if (pInfo->timestamp != 0) {
17,632!
59
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
17,633✔
60
    if (cost > PROCESS_THRESHOLD) {
17,633✔
61
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
54!
62
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
63
    }
64
  }
65

66
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
17,632✔
67
    if (code != 0 && terrno != 0) code = terrno;
9,549!
68
    mmSendRsp(pMsg, code);
9,549✔
69
  } else {
70
    rpcFreeCont(pMsg->info.rsp);
8,083✔
71
    pMsg->info.rsp = NULL;
8,084✔
72
  }
73

74
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
17,634✔
75
    mndPostProcessQueryMsg(pMsg);
420✔
76
  }
77

78
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
17,634!
79
  rpcFreeCont(pMsg->pCont);
17,634✔
80
  taosFreeQitem(pMsg);
17,634✔
81
}
17,635✔
82

83
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
5,629✔
84
  SMnodeMgmt *pMgmt = pInfo->ahandle;
5,629✔
85
  pMsg->info.node = pMgmt->pMnode;
5,629✔
86

87
  const STraceId *trace = &pMsg->info.traceId;
5,629✔
88
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
5,629!
89

90
  SMsgHead *pHead = pMsg->pCont;
5,629✔
91
  pHead->contLen = ntohl(pHead->contLen);
5,629✔
92
  pHead->vgId = ntohl(pHead->vgId);
5,629✔
93

94
  int32_t code = mndProcessSyncMsg(pMsg);
5,629✔
95

96
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
5,629!
97
  rpcFreeCont(pMsg->pCont);
5,629✔
98
  taosFreeQitem(pMsg);
5,629✔
99
}
5,629✔
100

101
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
23,263✔
102
  const STraceId *trace = &pMsg->info.traceId;
23,263✔
103
  int32_t         code = 0;
23,263✔
104
  if ((code = mmAcquire(pMgmt)) == 0) {
23,263!
105
    dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
23,263!
106
    code = taosWriteQitem(pWorker->queue, pMsg);
23,263✔
107
    mmRelease(pMgmt);
23,264✔
108
    return code;
23,264✔
109
  } else {
110
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
×
111
            TMSG_INFO(pMsg->msgType));
112
    return code;
×
113
  }
114
}
115

116
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,969✔
117
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
2,969✔
118
}
119

120
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
121
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
×
122
}
123

124
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
849✔
125
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
849✔
126
}
127

128
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,274✔
129
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
1,274✔
130
}
131

132
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,260✔
133
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
5,260✔
134
}
135

136
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,867✔
137
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
3,867✔
138
}
139

140
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
248✔
141
  int32_t code = 0;
248✔
142
  if (NULL == pMgmt->pMnode) {
248!
143
    const STraceId *trace = &pMsg->info.traceId;
×
144
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
145
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
146
    return code;
×
147
  }
148
  pMsg->info.node = pMgmt->pMnode;
248✔
149
  if ((code = mndPreProcessQueryMsg(pMsg)) != 0) {
248!
150
    const STraceId *trace = &pMsg->info.traceId;
×
151
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
152
            TMSG_INFO(pMsg->msgType));
153
    return code;
×
154
  }
155
  return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
248✔
156
}
157

158
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
740✔
159
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
740✔
160
}
161

162
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
8,056✔
163
  int32_t code;
164

165
  SSingleWorker *pWorker = NULL;
8,056✔
166
  switch (qtype) {
8,056!
167
    case WRITE_QUEUE:
2,912✔
168
      pWorker = &pMgmt->writeWorker;
2,912✔
169
      break;
2,912✔
170
    case QUERY_QUEUE:
×
171
      pWorker = &pMgmt->queryWorker;
×
172
      break;
×
173
    case FETCH_QUEUE:
×
174
      pWorker = &pMgmt->fetchWorker;
×
175
      break;
×
176
    case READ_QUEUE:
80✔
177
      pWorker = &pMgmt->readWorker;
80✔
178
      break;
80✔
179
    case STATUS_QUEUE:
×
180
      pWorker = &pMgmt->statusWorker;
×
181
      break;
×
182
    case ARB_QUEUE:
1,558✔
183
      pWorker = &pMgmt->arbWorker;
1,558✔
184
      break;
1,558✔
185
    case SYNC_QUEUE:
3,506✔
186
      pWorker = &pMgmt->syncWorker;
3,506✔
187
      break;
3,506✔
188
    case SYNC_RD_QUEUE:
×
189
      pWorker = &pMgmt->syncRdWorker;
×
190
      break;
×
191
    default:
×
192
      code = TSDB_CODE_INVALID_PARA;
×
193
  }
194

195
  if (pWorker == NULL) return code;
8,056!
196

197
  SRpcMsg *pMsg;
198
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
8,056✔
199
  if (code) return code;
8,056!
200

201
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
8,056✔
202
  pRpc->pCont = NULL;
8,056✔
203

204
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
8,056!
205
         pRpc->contLen);
206
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
8,056✔
207
  if (code != 0) {
8,056!
208
    dTrace("msg:%p, is freed", pMsg);
×
209
    rpcFreeCont(pMsg->pCont);
×
210
    taosFreeQitem(pMsg);
×
211
  }
212
  return code;
8,056✔
213
}
214

215
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
60✔
216
  int32_t          code = 0;
60✔
217
  SSingleWorkerCfg qCfg = {
60✔
218
      .min = tsNumOfMnodeQueryThreads,
219
      .max = tsNumOfMnodeQueryThreads,
220
      .name = "mnode-query",
221
      .fp = (FItem)mmProcessRpcMsg,
222
      .param = pMgmt,
223
      .poolType = QUERY_AUTO_QWORKER_POOL,
224
  };
225
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
60!
226
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
227
    return code;
×
228
  }
229

230
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
60✔
231

232
  SSingleWorkerCfg fCfg = {
60✔
233
      .min = tsNumOfMnodeFetchThreads,
234
      .max = tsNumOfMnodeFetchThreads,
235
      .name = "mnode-fetch",
236
      .fp = (FItem)mmProcessRpcMsg,
237
      .param = pMgmt,
238
  };
239
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
60!
240
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
241
    return code;
×
242
  }
243

244
  SSingleWorkerCfg rCfg = {
60✔
245
      .min = tsNumOfMnodeReadThreads,
246
      .max = tsNumOfMnodeReadThreads,
247
      .name = "mnode-read",
248
      .fp = (FItem)mmProcessRpcMsg,
249
      .param = pMgmt,
250
  };
251
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
60!
252
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
253
    return code;
×
254
  }
255

256
  SSingleWorkerCfg stautsCfg = {
60✔
257
      .min = 1,
258
      .max = 1,
259
      .name = "mnode-status",
260
      .fp = (FItem)mmProcessRpcMsg,
261
      .param = pMgmt,
262
  };
263
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
60!
264
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
265
    return code;
×
266
  }
267

268
  SSingleWorkerCfg wCfg = {
60✔
269
      .min = 1,
270
      .max = 1,
271
      .name = "mnode-write",
272
      .fp = (FItem)mmProcessRpcMsg,
273
      .param = pMgmt,
274
  };
275
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
60!
276
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
277
    return code;
×
278
  }
279

280
  SSingleWorkerCfg sCfg = {
60✔
281
      .min = 1,
282
      .max = 1,
283
      .name = "mnode-sync",
284
      .fp = (FItem)mmProcessSyncMsg,
285
      .param = pMgmt,
286
  };
287
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
60!
288
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
289
    return code;
×
290
  }
291

292
  SSingleWorkerCfg scCfg = {
60✔
293
      .min = 1,
294
      .max = 1,
295
      .name = "mnode-sync-rd",
296
      .fp = (FItem)mmProcessSyncMsg,
297
      .param = pMgmt,
298
  };
299
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
60!
300
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
301
    return code;
×
302
  }
303

304
  SSingleWorkerCfg arbCfg = {
60✔
305
      .min = 1,
306
      .max = 1,
307
      .name = "mnode-arb",
308
      .fp = (FItem)mmProcessRpcMsg,
309
      .param = pMgmt,
310
  };
311
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
60!
312
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
313
    return code;
×
314
  }
315

316
  dDebug("mnode workers are initialized");
60✔
317
  return code;
60✔
318
}
319

320
void mmStopWorker(SMnodeMgmt *pMgmt) {
60✔
321
  while (pMgmt->refCount > 0) taosMsleep(10);
60!
322

323
  tSingleWorkerCleanup(&pMgmt->queryWorker);
60✔
324
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
60✔
325
  tSingleWorkerCleanup(&pMgmt->readWorker);
60✔
326
  tSingleWorkerCleanup(&pMgmt->statusWorker);
60✔
327
  tSingleWorkerCleanup(&pMgmt->writeWorker);
60✔
328
  tSingleWorkerCleanup(&pMgmt->arbWorker);
60✔
329
  tSingleWorkerCleanup(&pMgmt->syncWorker);
60✔
330
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
60✔
331
  dDebug("mnode workers are closed");
60✔
332
}
60✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc