• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3548

04 Dec 2024 01:03PM UTC coverage: 59.846% (-0.8%) from 60.691%
#3548

push

travis-ci

web-flow
Merge pull request #29033 from taosdata/fix/calculate-vnode-memory-used

fix/calculate-vnode-memory-used

118484 of 254183 branches covered (46.61%)

Branch coverage included in aggregate %.

199691 of 277471 relevant lines covered (71.97%)

18794141.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.86
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18

19
#define PROCESS_THRESHOLD (2000 * 1000)
20

21
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
4,047,448✔
22
  int32_t code = 0;
4,047,448✔
23
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
4,047,448✔
24
  if (pMgmt->stopped) {
4,049,918✔
25
    code = TSDB_CODE_MNODE_STOPPED;
10✔
26
  } else {
27
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
4,049,908✔
28
  }
29
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
4,049,955✔
30
  return code;
4,049,875✔
31
}
32

33
static inline void mmRelease(SMnodeMgmt *pMgmt) {
4,049,540✔
34
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
4,049,540✔
35
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
4,050,013✔
36
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
4,050,125✔
37
}
4,049,999✔
38

39
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
2,753,440✔
40
  SRpcMsg rsp = {
2,753,440✔
41
      .code = code,
42
      .pCont = pMsg->info.rsp,
2,753,440✔
43
      .contLen = pMsg->info.rspLen,
2,753,440✔
44
      .info = pMsg->info,
45
  };
46
  tmsgSendRsp(&rsp);
2,753,440✔
47
}
2,754,041✔
48

49
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
3,925,136✔
50
  SMnodeMgmt *pMgmt = pInfo->ahandle;
3,925,136✔
51
  pMsg->info.node = pMgmt->pMnode;
3,925,136✔
52

53
  const STraceId *trace = &pMsg->info.traceId;
3,925,136✔
54
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
3,925,136!
55

56
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
3,925,136✔
57

58
  if (pInfo->timestamp != 0) {
3,924,427!
59
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
3,924,696✔
60
    if (cost > PROCESS_THRESHOLD) {
3,924,696✔
61
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
1,240!
62
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
63
    }
64
  }
65

66
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3,924,339✔
67
    if (code != 0 && terrno != 0) code = terrno;
2,753,953!
68
    mmSendRsp(pMsg, code);
2,753,953✔
69
  } else {
70
    rpcFreeCont(pMsg->info.rsp);
1,170,386✔
71
    pMsg->info.rsp = NULL;
1,170,837✔
72
  }
73

74
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
3,924,790✔
75
    mndPostProcessQueryMsg(pMsg);
19,995✔
76
  }
77

78
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
3,924,856!
79
  rpcFreeCont(pMsg->pCont);
3,924,856✔
80
  taosFreeQitem(pMsg);
3,925,270✔
81
}
3,925,266✔
82

83
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
124,793✔
84
  SMnodeMgmt *pMgmt = pInfo->ahandle;
124,793✔
85
  pMsg->info.node = pMgmt->pMnode;
124,793✔
86

87
  const STraceId *trace = &pMsg->info.traceId;
124,793✔
88
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
124,793!
89

90
  SMsgHead *pHead = pMsg->pCont;
124,793✔
91
  pHead->contLen = ntohl(pHead->contLen);
124,793✔
92
  pHead->vgId = ntohl(pHead->vgId);
124,793✔
93

94
  int32_t code = mndProcessSyncMsg(pMsg);
124,793✔
95

96
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
124,789!
97
  rpcFreeCont(pMsg->pCont);
124,789✔
98
  taosFreeQitem(pMsg);
124,786✔
99
}
124,780✔
100

101
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
4,047,632✔
102
  const STraceId *trace = &pMsg->info.traceId;
4,047,632✔
103
  int32_t         code = 0;
4,047,632✔
104
  if ((code = mmAcquire(pMgmt)) == 0) {
4,047,632✔
105
    dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
4,049,820!
106
    code = taosWriteQitem(pWorker->queue, pMsg);
4,049,820✔
107
    mmRelease(pMgmt);
4,049,634✔
108
    return code;
4,049,953✔
109
  } else {
110
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
10!
111
            TMSG_INFO(pMsg->msgType));
112
    return code;
10✔
113
  }
114
}
115

116
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
227,014✔
117
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
227,014✔
118
}
119

120
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
10✔
121
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
10✔
122
}
123

124
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
17,970✔
125
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
17,970✔
126
}
127

128
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
9,867✔
129
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
9,867✔
130
}
131

132
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,768,656✔
133
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
2,768,656✔
134
}
135

136
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
220,811✔
137
  int32_t code = 0;
220,811✔
138
  if (NULL == pMgmt->pMnode) {
220,811!
139
    const STraceId *trace = &pMsg->info.traceId;
×
140
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
141
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
142
    return code;
×
143
  }
144
  pMsg->info.node = pMgmt->pMnode;
220,811✔
145
  if ((code = mndPreProcessQueryMsg(pMsg)) != 0) {
220,811!
146
    const STraceId *trace = &pMsg->info.traceId;
×
147
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
148
            TMSG_INFO(pMsg->msgType));
149
    return code;
×
150
  }
151
  return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
221,369✔
152
}
153

154
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
574,402✔
155
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
574,402✔
156
}
157

158
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
228,715✔
159
  int32_t code;
160

161
  SSingleWorker *pWorker = NULL;
228,715✔
162
  switch (qtype) {
228,715!
163
    case WRITE_QUEUE:
112,616✔
164
      pWorker = &pMgmt->writeWorker;
112,616✔
165
      break;
112,616✔
166
    case QUERY_QUEUE:
×
167
      pWorker = &pMgmt->queryWorker;
×
168
      break;
×
169
    case FETCH_QUEUE:
×
170
      pWorker = &pMgmt->fetchWorker;
×
171
      break;
×
172
    case READ_QUEUE:
2,634✔
173
      pWorker = &pMgmt->readWorker;
2,634✔
174
      break;
2,634✔
175
    case ARB_QUEUE:
16,504✔
176
      pWorker = &pMgmt->arbWorker;
16,504✔
177
      break;
16,504✔
178
    case SYNC_QUEUE:
96,965✔
179
      pWorker = &pMgmt->syncWorker;
96,965✔
180
      break;
96,965✔
181
    case SYNC_RD_QUEUE:
×
182
      pWorker = &pMgmt->syncRdWorker;
×
183
      break;
×
184
    default:
×
185
      code = TSDB_CODE_INVALID_PARA;
×
186
  }
187

188
  if (pWorker == NULL) return code;
228,715!
189

190
  SRpcMsg *pMsg;
191
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
228,715✔
192
  if (code) return code;
228,711!
193

194
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
228,711✔
195
  pRpc->pCont = NULL;
228,711✔
196

197
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
228,711!
198
         pRpc->contLen);
199
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
228,711✔
200
  if (code != 0) {
228,718✔
201
    dTrace("msg:%p, is freed", pMsg);
5!
202
    rpcFreeCont(pMsg->pCont);
5✔
203
    taosFreeQitem(pMsg);
5✔
204
  }
205
  return code;
228,718✔
206
}
207

208
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
1,701✔
209
  int32_t          code = 0;
1,701✔
210
  SSingleWorkerCfg qCfg = {
1,701✔
211
      .min = tsNumOfMnodeQueryThreads,
212
      .max = tsNumOfMnodeQueryThreads,
213
      .name = "mnode-query",
214
      .fp = (FItem)mmProcessRpcMsg,
215
      .param = pMgmt,
216
      .poolType = QUERY_AUTO_QWORKER_POOL,
217
  };
218
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
1,701!
219
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
220
    return code;
×
221
  }
222

223
  SSingleWorkerCfg fCfg = {
1,701✔
224
      .min = tsNumOfMnodeFetchThreads,
225
      .max = tsNumOfMnodeFetchThreads,
226
      .name = "mnode-fetch",
227
      .fp = (FItem)mmProcessRpcMsg,
228
      .param = pMgmt,
229
  };
230
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
1,701!
231
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
232
    return code;
×
233
  }
234

235
  SSingleWorkerCfg rCfg = {
1,701✔
236
      .min = tsNumOfMnodeReadThreads,
237
      .max = tsNumOfMnodeReadThreads,
238
      .name = "mnode-read",
239
      .fp = (FItem)mmProcessRpcMsg,
240
      .param = pMgmt,
241
  };
242
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
1,701!
243
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
244
    return code;
×
245
  }
246

247
  SSingleWorkerCfg wCfg = {
1,701✔
248
      .min = 1,
249
      .max = 1,
250
      .name = "mnode-write",
251
      .fp = (FItem)mmProcessRpcMsg,
252
      .param = pMgmt,
253
  };
254
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
1,701!
255
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
256
    return code;
×
257
  }
258

259
  SSingleWorkerCfg sCfg = {
1,701✔
260
      .min = 1,
261
      .max = 1,
262
      .name = "mnode-sync",
263
      .fp = (FItem)mmProcessSyncMsg,
264
      .param = pMgmt,
265
  };
266
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
1,701!
267
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
268
    return code;
×
269
  }
270

271
  SSingleWorkerCfg scCfg = {
1,701✔
272
      .min = 1,
273
      .max = 1,
274
      .name = "mnode-sync-rd",
275
      .fp = (FItem)mmProcessSyncMsg,
276
      .param = pMgmt,
277
  };
278
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
1,701!
279
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
280
    return code;
×
281
  }
282

283
  SSingleWorkerCfg arbCfg = {
1,701✔
284
      .min = 1,
285
      .max = 1,
286
      .name = "mnode-arb",
287
      .fp = (FItem)mmProcessRpcMsg,
288
      .param = pMgmt,
289
  };
290
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
1,701!
291
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
292
    return code;
×
293
  }
294

295
  dDebug("mnode workers are initialized");
1,701✔
296
  return code;
1,701✔
297
}
298

299
void mmStopWorker(SMnodeMgmt *pMgmt) {
1,701✔
300
  while (pMgmt->refCount > 0) taosMsleep(10);
1,701!
301

302
  tSingleWorkerCleanup(&pMgmt->queryWorker);
1,701✔
303
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
1,701✔
304
  tSingleWorkerCleanup(&pMgmt->readWorker);
1,701✔
305
  tSingleWorkerCleanup(&pMgmt->writeWorker);
1,701✔
306
  tSingleWorkerCleanup(&pMgmt->arbWorker);
1,701✔
307
  tSingleWorkerCleanup(&pMgmt->syncWorker);
1,701✔
308
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
1,701✔
309
  dDebug("mnode workers are closed");
1,701✔
310
}
1,701✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc