• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.77
/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mmInt.h"
18

19
#define PROCESS_THRESHOLD (2000 * 1000)
20

21
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
666,576✔
22
  int32_t code = 0;
666,576✔
23
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
666,576✔
24
  if (pMgmt->stopped) {
666,764✔
25
    code = TSDB_CODE_MNODE_STOPPED;
4✔
26
  } else {
27
    (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
666,760✔
28
  }
29
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
666,754✔
30
  return code;
666,756✔
31
}
32

33
static inline void mmRelease(SMnodeMgmt *pMgmt) {
666,723✔
34
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
666,723✔
35
  (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
666,759✔
36
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
666,758✔
37
}
666,756✔
38

39
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
263,564✔
40
  SRpcMsg rsp = {
263,564✔
41
      .code = code,
42
      .pCont = pMsg->info.rsp,
263,564✔
43
      .contLen = pMsg->info.rspLen,
263,564✔
44
      .info = pMsg->info,
45
  };
46
  tmsgSendRsp(&rsp);
263,564✔
47
}
263,566✔
48

49
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
535,823✔
50
  SMnodeMgmt *pMgmt = pInfo->ahandle;
535,823✔
51
  pMsg->info.node = pMgmt->pMnode;
535,823✔
52

53
  const STraceId *trace = &pMsg->info.traceId;
535,823✔
54
  dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
535,823!
55

56
  int32_t code = mndProcessRpcMsg(pMsg, pInfo);
535,823✔
57

58
  if (pInfo->timestamp != 0) {
536,062!
59
    int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
536,065✔
60
    if (cost > PROCESS_THRESHOLD) {
536,065✔
61
      dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
932!
62
             TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
63
    }
64
  }
65

66
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
536,063✔
67
    if (code != 0 && terrno != 0) code = terrno;
263,569✔
68
    mmSendRsp(pMsg, code);
263,569✔
69
  } else {
70
    rpcFreeCont(pMsg->info.rsp);
272,494✔
71
    pMsg->info.rsp = NULL;
272,492✔
72
  }
73

74
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
536,057✔
75
    mndPostProcessQueryMsg(pMsg);
13,114✔
76
  }
77

78
  dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
536,056!
79
  rpcFreeCont(pMsg->pCont);
536,056✔
80
  taosFreeQitem(pMsg);
536,059✔
81
}
536,063✔
82

83
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
130,689✔
84
  SMnodeMgmt *pMgmt = pInfo->ahandle;
130,689✔
85
  pMsg->info.node = pMgmt->pMnode;
130,689✔
86

87
  const STraceId *trace = &pMsg->info.traceId;
130,689✔
88
  dGTrace("msg:%p, get from mnode-sync queue", pMsg);
130,689!
89

90
  SMsgHead *pHead = pMsg->pCont;
130,689✔
91
  pHead->contLen = ntohl(pHead->contLen);
130,689✔
92
  pHead->vgId = ntohl(pHead->vgId);
130,689✔
93

94
  int32_t code = mndProcessSyncMsg(pMsg);
130,689✔
95

96
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
130,689!
97
  rpcFreeCont(pMsg->pCont);
130,689✔
98
  taosFreeQitem(pMsg);
130,671✔
99
}
130,665✔
100

101
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
666,574✔
102
  const STraceId *trace = &pMsg->info.traceId;
666,574✔
103
  int32_t         code = 0;
666,574✔
104
  if ((code = mmAcquire(pMgmt)) == 0) {
666,574✔
105
    dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
666,751!
106
    code = taosWriteQitem(pWorker->queue, pMsg);
666,751✔
107
    mmRelease(pMgmt);
666,726✔
108
    return code;
666,754✔
109
  } else {
110
    dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
4!
111
            TMSG_INFO(pMsg->msgType));
112
    return code;
4✔
113
  }
114
}
115

116
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
138,609✔
117
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
138,609✔
118
}
119

120
int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
11✔
121
  return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg);
11✔
122
}
123

124
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
35,633✔
125
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
35,633✔
126
}
127

128
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8,615✔
129
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
8,615✔
130
}
131

132
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
259,301✔
133
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
259,301✔
134
}
135

136
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
14,855✔
137
  int32_t code = 0;
14,855✔
138
  if (NULL == pMgmt->pMnode) {
14,855!
139
    const STraceId *trace = &pMsg->info.traceId;
×
140
    code = TSDB_CODE_MNODE_NOT_FOUND;
×
141
    dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
×
142
    return code;
×
143
  }
144
  pMsg->info.node = pMgmt->pMnode;
14,855✔
145
  if ((code = mndPreProcessQueryMsg(pMsg)) != 0) {
14,855!
146
    const STraceId *trace = &pMsg->info.traceId;
×
147
    dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
×
148
            TMSG_INFO(pMsg->msgType));
149
    return code;
×
150
  }
151
  return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
14,855✔
152
}
153

154
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
37,581✔
155
  return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
37,581✔
156
}
157

158
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
171,973✔
159
  int32_t code;
160

161
  SSingleWorker *pWorker = NULL;
171,973✔
162
  switch (qtype) {
171,973!
163
    case WRITE_QUEUE:
71,725✔
164
      pWorker = &pMgmt->writeWorker;
71,725✔
165
      break;
71,725✔
166
    case QUERY_QUEUE:
×
167
      pWorker = &pMgmt->queryWorker;
×
168
      break;
×
169
    case FETCH_QUEUE:
×
170
      pWorker = &pMgmt->fetchWorker;
×
171
      break;
×
172
    case READ_QUEUE:
1,723✔
173
      pWorker = &pMgmt->readWorker;
1,723✔
174
      break;
1,723✔
175
    case ARB_QUEUE:
12,085✔
176
      pWorker = &pMgmt->arbWorker;
12,085✔
177
      break;
12,085✔
178
    case SYNC_QUEUE:
86,440✔
179
      pWorker = &pMgmt->syncWorker;
86,440✔
180
      break;
86,440✔
181
    case SYNC_RD_QUEUE:
×
182
      pWorker = &pMgmt->syncRdWorker;
×
183
      break;
×
184
    default:
×
185
      code = TSDB_CODE_INVALID_PARA;
×
186
  }
187

188
  if (pWorker == NULL) return code;
171,973!
189

190
  SRpcMsg *pMsg;
191
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
171,973✔
192
  if (code) return code;
171,972!
193

194
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
171,972✔
195
  pRpc->pCont = NULL;
171,972✔
196

197
  dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
171,972!
198
         pRpc->contLen);
199
  code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
171,972✔
200
  if (code != 0) {
171,973!
UNCOV
201
    dTrace("msg:%p, is freed", pMsg);
×
UNCOV
202
    rpcFreeCont(pMsg->pCont);
×
UNCOV
203
    taosFreeQitem(pMsg);
×
204
  }
205
  return code;
171,973✔
206
}
207

208
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
1,517✔
209
  int32_t          code = 0;
1,517✔
210
  SSingleWorkerCfg qCfg = {
1,517✔
211
      .min = tsNumOfMnodeQueryThreads,
212
      .max = tsNumOfMnodeQueryThreads,
213
      .name = "mnode-query",
214
      .fp = (FItem)mmProcessRpcMsg,
215
      .param = pMgmt,
216
      .poolType = QUERY_AUTO_QWORKER_POOL,
217
  };
218
  if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
1,517!
219
    dError("failed to start mnode-query worker since %s", tstrerror(code));
×
220
    return code;
×
221
  }
222

223
  tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
1,517✔
224

225
  SSingleWorkerCfg fCfg = {
1,517✔
226
      .min = tsNumOfMnodeFetchThreads,
227
      .max = tsNumOfMnodeFetchThreads,
228
      .name = "mnode-fetch",
229
      .fp = (FItem)mmProcessRpcMsg,
230
      .param = pMgmt,
231
  };
232
  if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
1,517!
233
    dError("failed to start mnode-fetch worker since %s", tstrerror(code));
×
234
    return code;
×
235
  }
236

237
  SSingleWorkerCfg rCfg = {
1,517✔
238
      .min = tsNumOfMnodeReadThreads,
239
      .max = tsNumOfMnodeReadThreads,
240
      .name = "mnode-read",
241
      .fp = (FItem)mmProcessRpcMsg,
242
      .param = pMgmt,
243
  };
244
  if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
1,517!
245
    dError("failed to start mnode-read worker since %s", tstrerror(code));
×
246
    return code;
×
247
  }
248

249
  SSingleWorkerCfg wCfg = {
1,517✔
250
      .min = 1,
251
      .max = 1,
252
      .name = "mnode-write",
253
      .fp = (FItem)mmProcessRpcMsg,
254
      .param = pMgmt,
255
  };
256
  if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
1,517!
257
    dError("failed to start mnode-write worker since %s", tstrerror(code));
×
258
    return code;
×
259
  }
260

261
  SSingleWorkerCfg sCfg = {
1,517✔
262
      .min = 1,
263
      .max = 1,
264
      .name = "mnode-sync",
265
      .fp = (FItem)mmProcessSyncMsg,
266
      .param = pMgmt,
267
  };
268
  if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
1,517!
269
    dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
×
270
    return code;
×
271
  }
272

273
  SSingleWorkerCfg scCfg = {
1,517✔
274
      .min = 1,
275
      .max = 1,
276
      .name = "mnode-sync-rd",
277
      .fp = (FItem)mmProcessSyncMsg,
278
      .param = pMgmt,
279
  };
280
  if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
1,517!
281
    dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
×
282
    return code;
×
283
  }
284

285
  SSingleWorkerCfg arbCfg = {
1,517✔
286
      .min = 1,
287
      .max = 1,
288
      .name = "mnode-arb",
289
      .fp = (FItem)mmProcessRpcMsg,
290
      .param = pMgmt,
291
  };
292
  if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
1,517!
293
    dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
×
294
    return code;
×
295
  }
296

297
  dDebug("mnode workers are initialized");
1,517✔
298
  return code;
1,517✔
299
}
300

301
void mmStopWorker(SMnodeMgmt *pMgmt) {
1,517✔
302
  while (pMgmt->refCount > 0) taosMsleep(10);
1,517!
303

304
  tSingleWorkerCleanup(&pMgmt->queryWorker);
1,517✔
305
  tSingleWorkerCleanup(&pMgmt->fetchWorker);
1,517✔
306
  tSingleWorkerCleanup(&pMgmt->readWorker);
1,517✔
307
  tSingleWorkerCleanup(&pMgmt->writeWorker);
1,517✔
308
  tSingleWorkerCleanup(&pMgmt->arbWorker);
1,517✔
309
  tSingleWorkerCleanup(&pMgmt->syncWorker);
1,517✔
310
  tSingleWorkerCleanup(&pMgmt->syncRdWorker);
1,517✔
311
  dDebug("mnode workers are closed");
1,517✔
312
}
1,517✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc