• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4113

17 May 2025 06:43AM UTC coverage: 62.054% (-0.8%) from 62.857%
#4113

push

travis-ci

web-flow
Merge pull request #31115 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

154737 of 318088 branches covered (48.65%)

Branch coverage included in aggregate %.

175 of 225 new or added lines in 20 files covered. (77.78%)

5853 existing lines in 216 files now uncovered.

239453 of 317147 relevant lines covered (75.5%)

15121865.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.9
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18

19
#define PROCESS_THRESHOLD (2000 * 1000)
20

21
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
3,835,154✔
22
  int32_t code = 0;
3,835,154✔
23
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
3,835,154✔
24
  if (pMgmt->stopped) {
3,837,163✔
25
    code = TSDB_CODE_MNODE_STOPPED;
17✔
26
  } else {
27
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
3,837,146✔
28
  }
29
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
3,837,164✔
30
  return code;
3,837,160✔
31
}
32

33
static inline void mmRelease(SMnodeMgmt *pMgmt) {
3,836,692✔
34
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
3,836,692✔
35
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
3,837,189✔
36
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
3,837,305✔
37
}
3,837,205✔
38

39
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
2,389,579✔
40
  SRpcMsg rsp = {
2,389,579✔
41
      .code = code,
42
      .pCont = pMsg->info.rsp,
2,389,579✔
43
      .contLen = pMsg->info.rspLen,
2,389,579✔
44
      .info = pMsg->info,
45
  };
46
  tmsgSendRsp(&rsp);
2,389,579✔
47
}
2,389,944✔
48

49
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
3,662,107✔
50
  SMnodeMgmt *pMgmt = pInfo->ahandle;
3,662,107✔
51
  pMsg->info.node = pMgmt->pMnode;
3,662,107✔
52

53
  const STraceId *trace = &pMsg->info.traceId;
3,662,107✔
54
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
3,662,107!
55

56
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
3,662,107✔
57

58
  if (pInfo->timestamp != 0) {
3,662,066!
59
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
3,662,406✔
60
    if (cost > PROCESS_THRESHOLD) {
3,662,406✔
61
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
1,276!
62
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
63
    }
64
  }
65

66
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3,662,125✔
67
    if (code != 0 && terrno != 0) code = terrno;
2,389,926!
68
    mmSendRsp(pMsg, code);
2,389,925✔
69
  } else {
70
    rpcFreeCont(pMsg->info.rsp);
1,272,199✔
71
    pMsg->info.rsp = NULL;
1,272,482✔
72
  }
73

74
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
3,662,401✔
75
    mndPostProcessQueryMsg(pMsg);
28,799✔
76
  }
77

78
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
3,662,400!
79
  rpcFreeCont(pMsg->pCont);
3,662,401✔
80
  taosFreeQitem(pMsg);
3,662,792✔
81
}
3,662,797✔
82

83
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
174,496✔
84
  SMnodeMgmt *pMgmt = pInfo->ahandle;
174,496✔
85
  pMsg->info.node = pMgmt->pMnode;
174,496✔
86

87
  const STraceId *trace = &pMsg->info.traceId;
174,496✔
88
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
174,496!
89

90
  SMsgHead *pHead = pMsg->pCont;
174,496✔
91
  pHead->contLen = ntohl(pHead->contLen);
174,496✔
92
  pHead->vgId = ntohl(pHead->vgId);
174,496✔
93

94
  int32_t code = mndProcessSyncMsg(pMsg);
174,496✔
95

96
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
174,496!
97
  rpcFreeCont(pMsg->pCont);
174,496✔
98
  taosFreeQitem(pMsg);
174,484✔
99
}
174,485✔
100

101
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
3,835,520✔
102
  const STraceId *trace = &pMsg->info.traceId;
3,835,520✔
103
  int32_t         code = 0;
3,835,520✔
104
  if ((code = mmAcquire(pMgmt)) == 0) {
3,835,520✔
105
    dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
3,837,082!
106
    code = taosWriteQitem(pWorker->queue, pMsg);
3,837,082✔
107
    mmRelease(pMgmt);
3,836,733✔
108
    return code;
3,837,154✔
109
  } else {
110
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
17!
111
            TMSG_INFO(pMsg->msgType));
112
    return code;
17✔
113
  }
114
}
115

116
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
231,620✔
117
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
231,620✔
118
}
119

120
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
61✔
121
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
61✔
122
}
123

124
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
40,526✔
125
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
40,526✔
126
}
127

128
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
12,727✔
129
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
12,727✔
130
}
131

132
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,244,420✔
133
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
2,244,420✔
134
}
135

136
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
137,584✔
137
  return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
137,584✔
138
}
139

140
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
247,595✔
141
  int32_t code = 0;
247,595✔
142
  if (NULL == pMgmt->pMnode) {
247,595!
143
    const STraceId *trace = &pMsg->info.traceId;
×
144
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
145
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
146
    return code;
×
147
  }
148
  pMsg->info.node = pMgmt->pMnode;
247,595✔
149
  if ((code = mndPreProcessQueryMsg(pMsg)) != 0) {
247,595!
150
    const STraceId *trace = &pMsg->info.traceId;
×
151
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
152
            TMSG_INFO(pMsg->msgType));
153
    return code;
×
154
  }
155
  return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
247,633✔
156
}
157

158
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
639,086✔
159
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
639,086✔
160
}
161

162
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
282,123✔
163
  int32_t code;
164

165
  SSingleWorker *pWorker = NULL;
282,123✔
166
  switch (qtype) {
282,123!
167
    case WRITE_QUEUE:
101,223✔
168
      pWorker = &pMgmt->writeWorker;
101,223✔
169
      break;
101,223✔
UNCOV
170
    case QUERY_QUEUE:
×
UNCOV
171
      pWorker = &pMgmt->queryWorker;
×
UNCOV
172
      break;
×
173
    case FETCH_QUEUE:
×
174
      pWorker = &pMgmt->fetchWorker;
×
175
      break;
×
176
    case READ_QUEUE:
135✔
177
      pWorker = &pMgmt->readWorker;
135✔
178
      break;
135✔
179
    case STATUS_QUEUE:
×
180
      pWorker = &pMgmt->statusWorker;
×
181
      break;
×
182
    case ARB_QUEUE:
59,509✔
183
      pWorker = &pMgmt->arbWorker;
59,509✔
184
      break;
59,509✔
185
    case SYNC_QUEUE:
121,256✔
186
      pWorker = &pMgmt->syncWorker;
121,256✔
187
      break;
121,256✔
188
    case SYNC_RD_QUEUE:
×
189
      pWorker = &pMgmt->syncRdWorker;
×
190
      break;
×
191
    default:
×
192
      code = TSDB_CODE_INVALID_PARA;
×
193
  }
194

195
  if (pWorker == NULL) return code;
282,123!
196

197
  SRpcMsg *pMsg;
198
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
282,123✔
199
  if (code) return code;
282,123!
200

201
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
282,123✔
202
  pRpc->pCont = NULL;
282,123✔
203

204
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
282,123!
205
         pRpc->contLen);
206
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
282,123✔
207
  if (code != 0) {
282,123✔
208
    dTrace("msg:%p, is freed", pMsg);
11!
209
    rpcFreeCont(pMsg->pCont);
11✔
210
    taosFreeQitem(pMsg);
11✔
211
  }
212
  return code;
282,123✔
213
}
214

215
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
2,061✔
216
  int32_t          code = 0;
2,061✔
217
  SSingleWorkerCfg qCfg = {
2,061✔
218
      .min = tsNumOfMnodeQueryThreads,
219
      .max = tsNumOfMnodeQueryThreads,
220
      .name = "mnode-query",
221
      .fp = (FItem)mmProcessRpcMsg,
222
      .param = pMgmt,
223
      .poolType = QUERY_AUTO_QWORKER_POOL,
224
  };
225
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
2,061!
226
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
227
    return code;
×
228
  }
229

230
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
2,061✔
231

232
  SSingleWorkerCfg fCfg = {
2,061✔
233
      .min = tsNumOfMnodeFetchThreads,
234
      .max = tsNumOfMnodeFetchThreads,
235
      .name = "mnode-fetch",
236
      .fp = (FItem)mmProcessRpcMsg,
237
      .param = pMgmt,
238
  };
239
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
2,061!
240
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
241
    return code;
×
242
  }
243

244
  SSingleWorkerCfg rCfg = {
2,061✔
245
      .min = tsNumOfMnodeReadThreads,
246
      .max = tsNumOfMnodeReadThreads,
247
      .name = "mnode-read",
248
      .fp = (FItem)mmProcessRpcMsg,
249
      .param = pMgmt,
250
  };
251
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
2,061!
252
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
253
    return code;
×
254
  }
255

256
  SSingleWorkerCfg stautsCfg = {
2,061✔
257
      .min = 1,
258
      .max = 1,
259
      .name = "mnode-status",
260
      .fp = (FItem)mmProcessRpcMsg,
261
      .param = pMgmt,
262
  };
263
  if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
2,061!
264
    dError("failed to start mnode-status worker since %s", tstrerror(code));
×
265
    return code;
×
266
  }
267

268
  SSingleWorkerCfg wCfg = {
2,061✔
269
      .min = 1,
270
      .max = 1,
271
      .name = "mnode-write",
272
      .fp = (FItem)mmProcessRpcMsg,
273
      .param = pMgmt,
274
  };
275
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
2,061!
276
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
277
    return code;
×
278
  }
279

280
  SSingleWorkerCfg sCfg = {
2,061✔
281
      .min = 1,
282
      .max = 1,
283
      .name = "mnode-sync",
284
      .fp = (FItem)mmProcessSyncMsg,
285
      .param = pMgmt,
286
  };
287
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
2,061!
288
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
289
    return code;
×
290
  }
291

292
  SSingleWorkerCfg scCfg = {
2,061✔
293
      .min = 1,
294
      .max = 1,
295
      .name = "mnode-sync-rd",
296
      .fp = (FItem)mmProcessSyncMsg,
297
      .param = pMgmt,
298
  };
299
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
2,061!
300
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
301
    return code;
×
302
  }
303

304
  SSingleWorkerCfg arbCfg = {
2,061✔
305
      .min = 1,
306
      .max = 1,
307
      .name = "mnode-arb",
308
      .fp = (FItem)mmProcessRpcMsg,
309
      .param = pMgmt,
310
  };
311
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
2,061!
312
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
313
    return code;
×
314
  }
315

316
  dDebug("mnode workers are initialized");
2,061✔
317
  return code;
2,061✔
318
}
319

320
void mmStopWorker(SMnodeMgmt *pMgmt) {
2,061✔
321
  while (pMgmt->refCount > 0) taosMsleep(10);
2,061!
322

323
  tSingleWorkerCleanup(&pMgmt->queryWorker);
2,061✔
324
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
2,061✔
325
  tSingleWorkerCleanup(&pMgmt->readWorker);
2,061✔
326
  tSingleWorkerCleanup(&pMgmt->statusWorker);
2,061✔
327
  tSingleWorkerCleanup(&pMgmt->writeWorker);
2,061✔
328
  tSingleWorkerCleanup(&pMgmt->arbWorker);
2,061✔
329
  tSingleWorkerCleanup(&pMgmt->syncWorker);
2,061✔
330
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
2,061✔
331
  dDebug("mnode workers are closed");
2,061✔
332
}
2,061✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc