• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.98
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18

19
#define PROCESS_THRESHOLD (2000 * 1000)
20

21
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
27,060✔
22
  int32_t code = 0;
27,060✔
23
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
27,060✔
24
  if (pMgmt->stopped) {
27,060!
UNCOV
25
    code = TSDB_CODE_MNODE_STOPPED;
×
26
  } else {
27
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
27,060✔
28
  }
29
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
27,060✔
30
  return code;
27,060✔
31
}
32

33
static inline void mmRelease(SMnodeMgmt *pMgmt) {
27,059✔
34
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
27,059✔
35
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
27,060✔
36
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
27,060✔
37
}
27,060✔
38

39
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
19,623✔
40
  SRpcMsg rsp = {
19,623✔
41
      .code = code,
42
      .pCont = pMsg->info.rsp,
19,623✔
43
      .contLen = pMsg->info.rspLen,
19,623✔
44
      .info = pMsg->info,
45
  };
46
  tmsgSendRsp(&rsp);
19,623✔
47
}
19,624✔
48

49
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
25,451✔
50
  SMnodeMgmt *pMgmt = pInfo->ahandle;
25,451✔
51
  pMsg->info.node = pMgmt->pMnode;
25,451✔
52

53
  const STraceId *trace = &pMsg->info.traceId;
25,451✔
54
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
25,451!
55

56
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
25,451✔
57

58
  if (pInfo->timestamp != 0) {
25,459!
59
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
25,460✔
60
    if (cost > PROCESS_THRESHOLD) {
25,460✔
61
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
53!
62
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
63
    }
64
  }
65

66
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
25,460✔
67
    if (code != 0 && terrno != 0) code = terrno;
19,625!
68
    mmSendRsp(pMsg, code);
19,625✔
69
  } else {
70
    rpcFreeCont(pMsg->info.rsp);
5,835✔
71
    pMsg->info.rsp = NULL;
5,834✔
72
  }
73

74
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
25,457!
UNCOV
75
    mndPostProcessQueryMsg(pMsg);
×
76
  }
77

78
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
25,458!
79
  rpcFreeCont(pMsg->pCont);
25,458✔
80
  taosFreeQitem(pMsg);
25,461✔
81
}
25,461✔
82

83
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
1,599✔
84
  SMnodeMgmt *pMgmt = pInfo->ahandle;
1,599✔
85
  pMsg->info.node = pMgmt->pMnode;
1,599✔
86

87
  const STraceId *trace = &pMsg->info.traceId;
1,599✔
88
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
1,599!
89

90
  SMsgHead *pHead = pMsg->pCont;
1,599✔
91
  pHead->contLen = ntohl(pHead->contLen);
1,599✔
92
  pHead->vgId = ntohl(pHead->vgId);
1,599✔
93

94
  int32_t code = mndProcessSyncMsg(pMsg);
1,599✔
95

96
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
1,599!
97
  rpcFreeCont(pMsg->pCont);
1,599✔
98
  taosFreeQitem(pMsg);
1,599✔
99
}
1,599✔
100

101
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
27,060✔
102
  const STraceId *trace = &pMsg->info.traceId;
27,060✔
103
  int32_t         code = 0;
27,060✔
104
  if ((code = mmAcquire(pMgmt)) == 0) {
27,060!
105
    dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
27,060!
106
    code = taosWriteQitem(pWorker->queue, pMsg);
27,060✔
107
    mmRelease(pMgmt);
27,059✔
108
    return code;
27,060✔
109
  } else {
UNCOV
110
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
×
111
            TMSG_INFO(pMsg->msgType));
UNCOV
112
    return code;
×
113
  }
114
}
115

116
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,271✔
117
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
2,271✔
118
}
119

UNCOV
120
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
121
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
×
122
}
123

UNCOV
124
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
125
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
×
126
}
127

UNCOV
128
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
129
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
×
130
}
131

132
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
19,544✔
133
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
19,544✔
134
}
135

136
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
68✔
137
  int32_t code = 0;
68✔
138
  if (NULL == pMgmt->pMnode) {
68!
139
    const STraceId *trace = &pMsg->info.traceId;
×
140
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
141
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
142
    return code;
×
143
  }
144
  pMsg->info.node = pMgmt->pMnode;
68✔
145
  if ((code = mndPreProcessQueryMsg(pMsg)) != 0) {
68!
146
    const STraceId *trace = &pMsg->info.traceId;
×
147
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
148
            TMSG_INFO(pMsg->msgType));
149
    return code;
×
150
  }
151
  return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
68✔
152
}
153

154
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
206✔
155
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
206✔
156
}
157

158
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
4,971✔
159
  int32_t code;
160

161
  SSingleWorker *pWorker = NULL;
4,971✔
162
  switch (qtype) {
4,971!
163
    case WRITE_QUEUE:
2,816✔
164
      pWorker = &pMgmt->writeWorker;
2,816✔
165
      break;
2,816✔
166
    case QUERY_QUEUE:
×
167
      pWorker = &pMgmt->queryWorker;
×
168
      break;
×
169
    case FETCH_QUEUE:
×
170
      pWorker = &pMgmt->fetchWorker;
×
171
      break;
×
172
    case READ_QUEUE:
77✔
173
      pWorker = &pMgmt->readWorker;
77✔
174
      break;
77✔
175
    case ARB_QUEUE:
479✔
176
      pWorker = &pMgmt->arbWorker;
479✔
177
      break;
479✔
178
    case SYNC_QUEUE:
1,599✔
179
      pWorker = &pMgmt->syncWorker;
1,599✔
180
      break;
1,599✔
181
    case SYNC_RD_QUEUE:
×
182
      pWorker = &pMgmt->syncRdWorker;
×
183
      break;
×
UNCOV
184
    default:
×
UNCOV
185
      code = TSDB_CODE_INVALID_PARA;
×
186
  }
187

188
  if (pWorker == NULL) return code;
4,971!
189

190
  SRpcMsg *pMsg;
191
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
4,971✔
192
  if (code) return code;
4,971!
193

194
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
4,971✔
195
  pRpc->pCont = NULL;
4,971✔
196

197
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
4,971!
198
         pRpc->contLen);
199
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
4,971✔
200
  if (code != 0) {
4,971!
201
    dTrace("msg:%p, is freed", pMsg);
×
202
    rpcFreeCont(pMsg->pCont);
×
203
    taosFreeQitem(pMsg);
×
204
  }
205
  return code;
4,971✔
206
}
207

208
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
13✔
209
  int32_t          code = 0;
13✔
210
  SSingleWorkerCfg qCfg = {
13✔
211
      .min = tsNumOfMnodeQueryThreads,
212
      .max = tsNumOfMnodeQueryThreads,
213
      .name = "mnode-query",
214
      .fp = (FItem)mmProcessRpcMsg,
215
      .param = pMgmt,
216
      .poolType = QUERY_AUTO_QWORKER_POOL,
217
  };
218
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
13!
219
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
220
    return code;
×
221
  }
222

223
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
13✔
224

225
  SSingleWorkerCfg fCfg = {
13✔
226
      .min = tsNumOfMnodeFetchThreads,
227
      .max = tsNumOfMnodeFetchThreads,
228
      .name = "mnode-fetch",
229
      .fp = (FItem)mmProcessRpcMsg,
230
      .param = pMgmt,
231
  };
232
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
13!
233
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
234
    return code;
×
235
  }
236

237
  SSingleWorkerCfg rCfg = {
13✔
238
      .min = tsNumOfMnodeReadThreads,
239
      .max = tsNumOfMnodeReadThreads,
240
      .name = "mnode-read",
241
      .fp = (FItem)mmProcessRpcMsg,
242
      .param = pMgmt,
243
  };
244
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
13!
245
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
246
    return code;
×
247
  }
248

249
  SSingleWorkerCfg wCfg = {
13✔
250
      .min = 1,
251
      .max = 1,
252
      .name = "mnode-write",
253
      .fp = (FItem)mmProcessRpcMsg,
254
      .param = pMgmt,
255
  };
256
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
13!
257
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
258
    return code;
×
259
  }
260

261
  SSingleWorkerCfg sCfg = {
13✔
262
      .min = 1,
263
      .max = 1,
264
      .name = "mnode-sync",
265
      .fp = (FItem)mmProcessSyncMsg,
266
      .param = pMgmt,
267
  };
268
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
13!
269
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
270
    return code;
×
271
  }
272

273
  SSingleWorkerCfg scCfg = {
13✔
274
      .min = 1,
275
      .max = 1,
276
      .name = "mnode-sync-rd",
277
      .fp = (FItem)mmProcessSyncMsg,
278
      .param = pMgmt,
279
  };
280
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
13!
281
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
282
    return code;
×
283
  }
284

285
  SSingleWorkerCfg arbCfg = {
13✔
286
      .min = 1,
287
      .max = 1,
288
      .name = "mnode-arb",
289
      .fp = (FItem)mmProcessRpcMsg,
290
      .param = pMgmt,
291
  };
292
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
13!
293
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
294
    return code;
×
295
  }
296

297
  dDebug("mnode workers are initialized");
13✔
298
  return code;
13✔
299
}
300

301
void mmStopWorker(SMnodeMgmt *pMgmt) {
13✔
302
  while (pMgmt->refCount > 0) taosMsleep(10);
13!
303

304
  tSingleWorkerCleanup(&pMgmt->queryWorker);
13✔
305
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
13✔
306
  tSingleWorkerCleanup(&pMgmt->readWorker);
13✔
307
  tSingleWorkerCleanup(&pMgmt->writeWorker);
13✔
308
  tSingleWorkerCleanup(&pMgmt->arbWorker);
13✔
309
  tSingleWorkerCleanup(&pMgmt->syncWorker);
13✔
310
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
13✔
311
  dDebug("mnode workers are closed");
13✔
312
}
13✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc