• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.76
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
63✔
23
  if (rpcSendResponse(pMsg) != 0) {
63!
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
63✔
27

UNCOV
28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
×
UNCOV
29
  SEpSet epSet = {0};
×
UNCOV
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
×
31

UNCOV
32
  if (epSet.numOfEps <= 1) {
×
UNCOV
33
    if (epSet.numOfEps == 0) {
×
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
UNCOV
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
×
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

UNCOV
47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
×
UNCOV
48
  pMsg->pCont = rpcMallocCont(contLen);
×
UNCOV
49
  if (pMsg->pCont == NULL) {
×
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
UNCOV
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
×
UNCOV
53
    if (contLen < 0) {
×
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
UNCOV
57
    pMsg->contLen = contLen;
×
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
274,825✔
62
  const STraceId *trace = &pMsg->info.traceId;
274,825✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
274,825✔
65
  if (msgFp == NULL) {
274,825!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
274,825!
72
  pMsg->info.wrapper = pWrapper;
274,825✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
274,825✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
1,949✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
1,949✔
83
    return code;
173✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
1,776✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
914✔
87
    code = TSDB_CODE_VND_STOPPED;
650✔
88
  }
89
  return code;
1,776✔
90
}
UNCOV
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
UNCOV
92
  int32_t        code = 0;
×
UNCOV
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
×
UNCOV
94
  code = tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
×
UNCOV
95
  if (code < 0) {
×
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
UNCOV
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
×
UNCOV
100
  pData->ipWhiteVer = ipWhite.ver;
×
101

UNCOV
102
  (void)tFreeSUpdateIpWhiteReq(&ipWhite);
×
103

UNCOV
104
  rpcFreeCont(pRpc->pCont);
×
105
}
106
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
297,586✔
107
  if (forbidden) {
297,586!
108
    SIpV4Range range = {.ip = clientIp, .mask = 32};
×
109
    char       buf[36] = {0};
×
110

111
    (void)rpcUtilSIpRangeToStr(&range, buf);
×
112
    dError("User:%s host:%s not in ip white list", user, buf);
×
113
    return true;
×
114
  } else {
115
    return false;
297,586✔
116
  }
117
}
118

119
static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
120
  SRetrieveAnalAlgoRsp rsp = {0};
×
121
  if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
122
    taosAnalUpdate(rsp.ver, rsp.hash);
×
123
    rsp.hash = NULL;
×
124
  }
125
  tFreeRetrieveAnalAlgoRsp(&rsp);
×
126
  rpcFreeCont(pRpc->pCont);
×
127
}
×
128

129
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
297,633✔
130
  SDnodeTrans  *pTrans = &pDnode->trans;
297,633✔
131
  int32_t       code = -1;
297,633✔
132
  SRpcMsg      *pMsg = NULL;
297,633✔
133
  SMgmtWrapper *pWrapper = NULL;
297,633✔
134
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
297,633✔
135

136
  const STraceId *trace = &pRpc->info.traceId;
297,633✔
137
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
297,633!
138
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
139

140
  int32_t svrVer = 0;
297,633✔
141
  code = taosVersionStrToInt(td_version, &svrVer);
297,633✔
142
  if (code != 0) {
297,699!
143
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
144
    goto _OVER;
×
145
  }
146
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
297,699!
147
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
×
148
           pRpc->info.conn.clientIp);
149
    goto _OVER;
×
150
  }
151

152
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
298,009✔
153
  if (isForbidden) {
297,726!
154
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
155
    goto _OVER;
×
156
  }
157

158
  switch (pRpc->msgType) {
297,726!
159
    case TDMT_DND_NET_TEST:
×
160
      dmProcessNetTestReq(pDnode, pRpc);
×
161
      return;
20,979✔
162
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
20,943✔
163
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
164
    case TDMT_SCH_FETCH_RSP:
165
    case TDMT_SCH_MERGE_FETCH_RSP:
166
    case TDMT_VND_SUBMIT_RSP:
167
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
20,943✔
168
      return;
20,979✔
169
    case TDMT_MND_STATUS_RSP:
×
170
      if (pEpSet != NULL) {
×
171
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
172
      }
173
      break;
×
UNCOV
174
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
UNCOV
175
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
×
UNCOV
176
      return;
×
177
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
×
178
      dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc);
×
179
      return;
×
180
    default:
276,783✔
181
      break;
276,783✔
182
  }
183

184
  /*
185
  pDnode is null, TD-22618
186
  at trans.c line 91
187
  before this line, dmProcessRpcMsg callback is set
188
  after this line, parent is set
189
  so when dmProcessRpcMsg is called, pDonde is still null.
190
  */
191
  if (pDnode != NULL) {
276,783✔
192
    if (pDnode->status != DND_STAT_RUNNING) {
276,636✔
193
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
1,886!
194
        dmProcessServerStartupStatus(pDnode, pRpc);
×
195
        return;
×
196
      } else {
197
        if (pDnode->status == DND_STAT_INIT) {
1,886✔
198
          code = TSDB_CODE_APP_IS_STARTING;
110✔
199
        } else {
200
          code = TSDB_CODE_APP_IS_STOPPING;
1,776✔
201
        }
202
        goto _OVER;
1,886✔
203
      }
204
    }
205
  } else {
206
    code = TSDB_CODE_APP_IS_STARTING;
147✔
207
    goto _OVER;
147✔
208
  }
209

210
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
274,750!
211
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
212
    code = TSDB_CODE_INVALID_MSG_LEN;
×
213
    goto _OVER;
×
214
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
274,750✔
215
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
27!
UNCOV
216
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
×
217
  }
218

219
  if (pHandle->defaultNtype == NODE_END) {
274,750!
220
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
221
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
222
    goto _OVER;
×
223
  }
224

225
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
274,750✔
226
  if (pHandle->needCheckVgId) {
274,750✔
227
    if (pRpc->contLen > 0) {
176,450!
228
      const SMsgHead *pHead = pRpc->pCont;
176,618✔
229
      const int32_t   vgId = ntohl(pHead->vgId);
176,618✔
230
      switch (vgId) {
176,618!
UNCOV
231
        case QNODE_HANDLE:
×
UNCOV
232
          pWrapper = &pDnode->wrappers[QNODE];
×
UNCOV
233
          break;
×
UNCOV
234
        case SNODE_HANDLE:
×
UNCOV
235
          pWrapper = &pDnode->wrappers[SNODE];
×
UNCOV
236
          break;
×
237
        case MNODE_HANDLE:
274✔
238
          pWrapper = &pDnode->wrappers[MNODE];
274✔
239
          break;
274✔
240
        default:
176,344✔
241
          break;
176,344✔
242
      }
243
    } else {
244
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
245
      code = TSDB_CODE_INVALID_MSG_LEN;
×
246
      goto _OVER;
×
247
    }
248
  }
249

250
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
274,918!
UNCOV
251
    pWrapper = NULL;
×
UNCOV
252
    goto _OVER;
×
253
  }
254

255
  pRpc->info.wrapper = pWrapper;
275,084✔
256

257
  EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
275,084✔
258
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
275,084✔
259
  if (code) goto _OVER;
275,161!
260

261
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
275,161✔
262
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
275,161!
263
          pRpc->contLen);
264

265
  code = dmProcessNodeMsg(pWrapper, pMsg);
275,161✔
266

267
_OVER:
277,159✔
268
  if (code != 0) {
277,159✔
269
    code = dmConvertErrCode(pRpc->msgType, code);
1,949✔
270
    if (pMsg) {
1,949✔
271
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
63!
272
    } else {
273
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
1,886!
274
    }
275

276
    if (IsReq(pRpc)) {
1,949!
277
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
1,949✔
278
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
1,949!
UNCOV
279
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
×
280
      }
281

282
      if (pWrapper != NULL) {
1,949✔
283
        dmSendRsp(&rsp);
63✔
284
      } else {
285
        if (rpcSendResponse(&rsp) != 0) {
1,886!
286
          dError("failed to send response, msg:%p", &rsp);
×
287
        }
288
      }
289
    }
290

291
    if (pMsg != NULL) {
1,949✔
292
      dGTrace("msg:%p, is freed", pMsg);
63!
293
      taosFreeQitem(pMsg);
63✔
294
    }
295
    rpcFreeCont(pRpc->pCont);
1,949✔
296
    pRpc->pCont = NULL;
1,949✔
297
  }
298

299
  dmReleaseWrapper(pWrapper);
277,159✔
300
}
301

302
int32_t dmInitMsgHandle(SDnode *pDnode) {
43✔
303
  SDnodeTrans *pTrans = &pDnode->trans;
43✔
304

305
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
258✔
306
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
215✔
307
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
215✔
308
    if (pArray == NULL) return -1;
215!
309

310
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
14,405✔
311
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
14,190✔
312
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
14,190✔
313
      if (pMgmt->needCheckVgId) {
14,190✔
314
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
2,494✔
315
      }
316
      if (!pMgmt->needCheckVgId) {
14,190✔
317
        pHandle->defaultNtype = ntype;
11,696✔
318
      }
319
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
14,190✔
320
    }
321

322
    taosArrayDestroy(pArray);
215✔
323
  }
324

325
  return 0;
43✔
326
}
327

328
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
34,869✔
329
  int32_t code = 0;
34,869✔
330
  SDnode *pDnode = dmInstance();
34,869✔
331
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
34,869!
UNCOV
332
    rpcFreeCont(pMsg->pCont);
×
UNCOV
333
    pMsg->pCont = NULL;
×
UNCOV
334
    if (pDnode->status == DND_STAT_INIT) {
×
335
      code = TSDB_CODE_APP_IS_STARTING;
×
336
    } else {
UNCOV
337
      code = TSDB_CODE_APP_IS_STOPPING;
×
338
    }
UNCOV
339
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
340
           pMsg->info.handle);
UNCOV
341
    return code;
×
342
  } else {
343
    pMsg->info.handle = 0;
34,869✔
344
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
34,869!
UNCOV
345
      dError("failed to send rpc msg");
×
346
    }
347
    return 0;
34,869✔
348
  }
349
}
350
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
76,966✔
351
  int32_t code = 0;
76,966✔
352
  SDnode *pDnode = dmInstance();
76,966✔
353
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
76,966!
354
    rpcFreeCont(pMsg->pCont);
×
355
    pMsg->pCont = NULL;
×
356
    if (pDnode->status == DND_STAT_INIT) {
×
357
      code = TSDB_CODE_APP_IS_STARTING;
×
358
    } else {
359
      code = TSDB_CODE_APP_IS_STOPPING;
×
360
    }
361
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
362
           pMsg->info.handle);
363
    return code;
×
364
  } else {
365
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
76,966✔
366
  }
367
}
368

369
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
35,924✔
370

371
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
35,832✔
372
  (void)rpcReleaseHandle(pHandle, type, status);
35,832✔
373
}
35,917✔
374

375
static bool rpcRfp(int32_t code, tmsg_t msgType) {
7,019✔
376
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
7,019!
377
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
6,625✔
378
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
1,105!
379
      code == TSDB_CODE_APP_IS_STOPPING) {
380
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
6,009!
381
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
6,009!
382
      return false;
×
383
    }
384
    return true;
6,009✔
385
  } else {
386
    return false;
1,010✔
387
  }
388
}
389
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
390
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
391
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
392
    return true;
×
393
  }
394
  return false;
×
395
}
396
int32_t dmInitClient(SDnode *pDnode) {
43✔
397
  SDnodeTrans *pTrans = &pDnode->trans;
43✔
398

399
  SRpcInit rpcInit = {0};
43✔
400
  rpcInit.label = "DNODE-CLI";
43✔
401
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
43✔
402
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
43✔
403
  rpcInit.sessions = 1024;
43✔
404
  rpcInit.connType = TAOS_CONN_CLIENT;
43✔
405
  rpcInit.user = TSDB_DEFAULT_USER;
43✔
406
  rpcInit.idleTime = tsShellActivityTimer * 1000;
43✔
407
  rpcInit.parent = pDnode;
43✔
408
  rpcInit.rfp = rpcRfp;
43✔
409
  rpcInit.compressSize = tsCompressMsgSize;
43✔
410
  rpcInit.dfp = destroyAhandle;
43✔
411

412
  rpcInit.retryMinInterval = tsRedirectPeriod;
43✔
413
  rpcInit.retryStepFactor = tsRedirectFactor;
43✔
414
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
43✔
415
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
43✔
416

417
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
43✔
418
  rpcInit.failFastThreshold = 3;    // failed threshold
43✔
419
  rpcInit.ffp = dmFailFastFp;
43✔
420

421
  rpcInit.noDelayFp = rpcNoDelayMsg;
43✔
422

423
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
43✔
424
  connLimitNum = TMAX(connLimitNum, 10);
43✔
425
  connLimitNum = TMIN(connLimitNum, 500);
43✔
426

427
  rpcInit.connLimitNum = connLimitNum;
43✔
428
  rpcInit.connLimitLock = 1;
43✔
429
  rpcInit.supportBatch = 1;
43✔
430
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
43✔
431
  rpcInit.shareConn = 1;
43✔
432
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
43✔
433
  rpcInit.notWaitAvaliableConn = 0;
43✔
434
  rpcInit.startReadTimer = 1;
43✔
435
  rpcInit.readTimeout = tsReadTimeout;
43✔
436

437
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
43!
438
    dError("failed to convert version string:%s to int", td_version);
×
439
  }
440

441
  pTrans->clientRpc = rpcOpen(&rpcInit);
43✔
442
  if (pTrans->clientRpc == NULL) {
43!
443
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
444
    return terrno;
×
445
  }
446

447
  dDebug("dnode rpc client is initialized");
43✔
448
  return 0;
43✔
449
}
450
int32_t dmInitStatusClient(SDnode *pDnode) {
43✔
451
  SDnodeTrans *pTrans = &pDnode->trans;
43✔
452

453
  SRpcInit rpcInit = {0};
43✔
454
  rpcInit.label = "DNODE-STA-CLI";
43✔
455
  rpcInit.numOfThreads = 1;
43✔
456
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
43✔
457
  rpcInit.sessions = 1024;
43✔
458
  rpcInit.connType = TAOS_CONN_CLIENT;
43✔
459
  rpcInit.user = TSDB_DEFAULT_USER;
43✔
460
  rpcInit.idleTime = tsShellActivityTimer * 1000;
43✔
461
  rpcInit.parent = pDnode;
43✔
462
  rpcInit.rfp = rpcRfp;
43✔
463
  rpcInit.compressSize = tsCompressMsgSize;
43✔
464

465
  rpcInit.retryMinInterval = tsRedirectPeriod;
43✔
466
  rpcInit.retryStepFactor = tsRedirectFactor;
43✔
467
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
43✔
468
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
43✔
469

470
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
43✔
471
  rpcInit.failFastThreshold = 3;    // failed threshold
43✔
472
  rpcInit.ffp = dmFailFastFp;
43✔
473

474
  int32_t connLimitNum = 100;
43✔
475
  connLimitNum = TMAX(connLimitNum, 10);
43✔
476
  connLimitNum = TMIN(connLimitNum, 500);
43✔
477

478
  rpcInit.connLimitNum = connLimitNum;
43✔
479
  rpcInit.connLimitLock = 1;
43✔
480
  rpcInit.supportBatch = 1;
43✔
481
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
43✔
482
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
43✔
483
  rpcInit.startReadTimer = 0;
43✔
484
  rpcInit.readTimeout = 0;
43✔
485

486
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
43!
487
    dError("failed to convert version string:%s to int", td_version);
×
488
  }
489

490
  pTrans->statusRpc = rpcOpen(&rpcInit);
43✔
491
  if (pTrans->statusRpc == NULL) {
43!
492
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
493
    return terrno;
×
494
  }
495

496
  dDebug("dnode rpc status client is initialized");
43✔
497
  return 0;
43✔
498
}
499

500
int32_t dmInitSyncClient(SDnode *pDnode) {
43✔
501
  SDnodeTrans *pTrans = &pDnode->trans;
43✔
502

503
  SRpcInit rpcInit = {0};
43✔
504
  rpcInit.label = "DNODE-SYNC-CLI";
43✔
505
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
43✔
506
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
43✔
507
  rpcInit.sessions = 1024;
43✔
508
  rpcInit.connType = TAOS_CONN_CLIENT;
43✔
509
  rpcInit.user = TSDB_DEFAULT_USER;
43✔
510
  rpcInit.idleTime = tsShellActivityTimer * 1000;
43✔
511
  rpcInit.parent = pDnode;
43✔
512
  rpcInit.rfp = rpcRfp;
43✔
513
  rpcInit.compressSize = tsCompressMsgSize;
43✔
514

515
  rpcInit.retryMinInterval = tsRedirectPeriod;
43✔
516
  rpcInit.retryStepFactor = tsRedirectFactor;
43✔
517
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
43✔
518
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
43✔
519

520
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
43✔
521
  rpcInit.failFastThreshold = 3;    // failed threshold
43✔
522
  rpcInit.ffp = dmFailFastFp;
43✔
523

524
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
43✔
525
  connLimitNum = TMAX(connLimitNum, 10);
43✔
526
  connLimitNum = TMIN(connLimitNum, 500);
43✔
527

528
  rpcInit.connLimitNum = connLimitNum;
43✔
529
  rpcInit.connLimitLock = 1;
43✔
530
  rpcInit.supportBatch = 1;
43✔
531
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
43✔
532
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
43✔
533
  rpcInit.startReadTimer = 1;
43✔
534
  rpcInit.readTimeout = tsReadTimeout;
43✔
535

536
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
43!
537
    dError("failed to convert version string:%s to int", td_version);
×
538
  }
539

540
  pTrans->syncRpc = rpcOpen(&rpcInit);
43✔
541
  if (pTrans->syncRpc == NULL) {
43!
542
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
543
    return terrno;
×
544
  }
545

546
  dDebug("dnode rpc sync client is initialized");
43✔
547
  return 0;
43✔
548
}
549

550
void dmCleanupClient(SDnode *pDnode) {
43✔
551
  SDnodeTrans *pTrans = &pDnode->trans;
43✔
552
  if (pTrans->clientRpc) {
43!
553
    rpcClose(pTrans->clientRpc);
43✔
554
    pTrans->clientRpc = NULL;
43✔
555
    dDebug("dnode rpc client is closed");
43✔
556
  }
557
}
43✔
558
void dmCleanupStatusClient(SDnode *pDnode) {
43✔
559
  SDnodeTrans *pTrans = &pDnode->trans;
43✔
560
  if (pTrans->statusRpc) {
43!
561
    rpcClose(pTrans->statusRpc);
43✔
562
    pTrans->statusRpc = NULL;
43✔
563
    dDebug("dnode rpc status client is closed");
43✔
564
  }
565
}
43✔
566
void dmCleanupSyncClient(SDnode *pDnode) {
43✔
567
  SDnodeTrans *pTrans = &pDnode->trans;
43✔
568
  if (pTrans->syncRpc) {
43!
569
    rpcClose(pTrans->syncRpc);
43✔
570
    pTrans->syncRpc = NULL;
43✔
571
    dDebug("dnode rpc sync client is closed");
43✔
572
  }
573
}
43✔
574

575
int32_t dmInitServer(SDnode *pDnode) {
43✔
576
  SDnodeTrans *pTrans = &pDnode->trans;
43✔
577

578
  SRpcInit rpcInit = {0};
43✔
579
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
43✔
580
  rpcInit.localPort = tsServerPort;
43✔
581
  rpcInit.label = "DND-S";
43✔
582
  rpcInit.numOfThreads = tsNumOfRpcThreads;
43✔
583
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
43✔
584
  rpcInit.sessions = tsMaxShellConns;
43✔
585
  rpcInit.connType = TAOS_CONN_SERVER;
43✔
586
  rpcInit.idleTime = tsShellActivityTimer * 1000;
43✔
587
  rpcInit.parent = pDnode;
43✔
588
  rpcInit.compressSize = tsCompressMsgSize;
43✔
589
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
43✔
590

591
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
43!
592
    dError("failed to convert version string:%s to int", td_version);
×
593
  }
594

595
  pTrans->serverRpc = rpcOpen(&rpcInit);
43✔
596
  if (pTrans->serverRpc == NULL) {
43!
UNCOV
597
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
×
UNCOV
598
    return terrno;
×
599
  }
600

601
  dDebug("dnode rpc server is initialized");
43✔
602
  return 0;
43✔
603
}
604

605
void dmCleanupServer(SDnode *pDnode) {
43✔
606
  SDnodeTrans *pTrans = &pDnode->trans;
43✔
607
  if (pTrans->serverRpc) {
43!
608
    rpcClose(pTrans->serverRpc);
43✔
609
    pTrans->serverRpc = NULL;
43✔
610
    dDebug("dnode rpc server is closed");
43✔
611
  }
612
}
43✔
613

614
SMsgCb dmGetMsgcb(SDnode *pDnode) {
319✔
615
  SMsgCb msgCb = {
319✔
616
      .clientRpc = pDnode->trans.clientRpc,
319✔
617
      .serverRpc = pDnode->trans.serverRpc,
319✔
618
      .statusRpc = pDnode->trans.statusRpc,
319✔
619
      .syncRpc = pDnode->trans.syncRpc,
319✔
620
      .sendReqFp = dmSendReq,
621
      .sendSyncReqFp = dmSendSyncReq,
622
      .sendRspFp = dmSendRsp,
623
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
624
      .releaseHandleFp = dmReleaseHandle,
625
      .reportStartupFp = dmReportStartup,
626
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
627
      .getDnodeEpFp = dmGetDnodeEp,
628
      .data = &pDnode->data,
319✔
629
  };
630
  return msgCb;
319✔
631
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc