• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.86
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
29✔
23
  if (rpcSendResponse(pMsg) != 0) {
29!
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
29✔
27

28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
12✔
29
  SEpSet epSet = {0};
12✔
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
12✔
31

32
  if (epSet.numOfEps <= 1) {
12✔
33
    if (epSet.numOfEps == 0) {
7!
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
7!
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
12✔
48
  pMsg->pCont = rpcMallocCont(contLen);
11✔
49
  if (pMsg->pCont == NULL) {
11!
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
11✔
53
    if (contLen < 0) {
12!
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
57
    pMsg->contLen = contLen;
12✔
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
220,966✔
62
  const STraceId *trace = &pMsg->info.traceId;
220,966✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
220,966✔
65
  if (msgFp == NULL) {
220,966!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
220,966!
72
  pMsg->info.wrapper = pWrapper;
220,966✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
220,966✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
4,468✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
4,468✔
83
    return code;
1,588✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
2,880✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
221✔
87
    code = TSDB_CODE_VND_STOPPED;
150✔
88
  }
89
  return code;
2,880✔
90
}
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
92
  int32_t        code = 0;
×
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
×
94
  code = tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
×
95
  if (code < 0) {
×
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
×
100
  pData->ipWhiteVer = ipWhite.ver;
×
101

102
  (void)tFreeSUpdateIpWhiteReq(&ipWhite);
×
103

104
  rpcFreeCont(pRpc->pCont);
×
105
}
106
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
227,286✔
107
  if (forbidden) {
227,286!
108
    SIpV4Range range = {.ip = clientIp, .mask = 32};
×
109
    char       buf[36] = {0};
×
110

111
    (void)rpcUtilSIpRangeToStr(&range, buf);
×
112
    dError("User:%s host:%s not in ip white list", user, buf);
×
113
    return true;
×
114
  } else {
115
    return false;
227,286✔
116
  }
117
}
118

119
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
120
  SRetrieveAnalyticAlgoRsp rsp = {0};
×
121
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
122
    taosAnalyUpdate(rsp.ver, rsp.hash);
×
123
    rsp.hash = NULL;
×
124
  }
125
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
×
126
  rpcFreeCont(pRpc->pCont);
×
127
}
×
128

129
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
227,287✔
130
  SDnodeTrans  *pTrans = &pDnode->trans;
227,287✔
131
  int32_t       code = -1;
227,287✔
132
  SRpcMsg      *pMsg = NULL;
227,287✔
133
  SMgmtWrapper *pWrapper = NULL;
227,287✔
134
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
227,287✔
135

136
  const STraceId *trace = &pRpc->info.traceId;
227,287✔
137
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
227,287!
138
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
139

140
  int32_t svrVer = 0;
227,287✔
141
  code = taosVersionStrToInt(td_version, &svrVer);
227,287✔
142
  if (code != 0) {
227,286!
143
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
144
    goto _OVER;
×
145
  }
146
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
227,286!
147
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
×
148
           pRpc->info.conn.clientIp);
149
    goto _OVER;
×
150
  }
151

152
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
227,287✔
153
  if (isForbidden) {
227,285!
154
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
155
    goto _OVER;
×
156
  }
157

158
  switch (pRpc->msgType) {
227,285!
159
    case TDMT_DND_NET_TEST:
×
160
      dmProcessNetTestReq(pDnode, pRpc);
×
161
      return;
1,878✔
162
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
1,879✔
163
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
164
    case TDMT_SCH_FETCH_RSP:
165
    case TDMT_SCH_MERGE_FETCH_RSP:
166
    case TDMT_VND_SUBMIT_RSP:
167
    case TDMT_MND_GET_DB_INFO_RSP:
168
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
1,879✔
169
      return;
1,878✔
170
    case TDMT_MND_STATUS_RSP:
×
171
      if (pEpSet != NULL) {
×
172
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
173
      }
174
      break;
×
175
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
176
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
×
177
      return;
×
178
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
×
179
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
×
180
      return;
×
181
    default:
225,406✔
182
      break;
225,406✔
183
  }
184

185
  /*
186
  pDnode is null, TD-22618
187
  at trans.c line 91
188
  before this line, dmProcessRpcMsg callback is set
189
  after this line, parent is set
190
  so when dmProcessRpcMsg is called, pDonde is still null.
191
  */
192
  if (pDnode != NULL) {
225,406!
193
    if (pDnode->status != DND_STAT_RUNNING) {
225,407✔
194
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
4,393!
195
        dmProcessServerStartupStatus(pDnode, pRpc);
×
196
        return;
×
197
      } else {
198
        if (pDnode->status == DND_STAT_INIT) {
4,393✔
199
          code = TSDB_CODE_APP_IS_STARTING;
1,513✔
200
        } else {
201
          code = TSDB_CODE_APP_IS_STOPPING;
2,880✔
202
        }
203
        goto _OVER;
4,393✔
204
      }
205
    }
206
  } else {
207
    code = TSDB_CODE_APP_IS_STARTING;
208
    goto _OVER;
209
  }
210

211
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
221,014!
212
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
213
    code = TSDB_CODE_INVALID_MSG_LEN;
×
214
    goto _OVER;
×
215
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
221,014✔
216
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
7!
217
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
×
218
  }
219

220
  if (pHandle->defaultNtype == NODE_END) {
221,014!
221
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
222
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
223
    goto _OVER;
×
224
  }
225

226
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
221,014✔
227
  if (pHandle->needCheckVgId) {
221,014✔
228
    if (pRpc->contLen > 0) {
131,897!
229
      const SMsgHead *pHead = pRpc->pCont;
131,897✔
230
      const int32_t   vgId = ntohl(pHead->vgId);
131,897✔
231
      switch (vgId) {
131,897✔
232
        case QNODE_HANDLE:
158✔
233
          pWrapper = &pDnode->wrappers[QNODE];
158✔
234
          break;
158✔
235
        case SNODE_HANDLE:
392✔
236
          pWrapper = &pDnode->wrappers[SNODE];
392✔
237
          break;
392✔
238
        case MNODE_HANDLE:
3,378✔
239
          pWrapper = &pDnode->wrappers[MNODE];
3,378✔
240
          break;
3,378✔
241
        default:
127,969✔
242
          break;
127,969✔
243
      }
244
    } else {
245
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
246
      code = TSDB_CODE_INVALID_MSG_LEN;
×
247
      goto _OVER;
×
248
    }
249
  }
250

251
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
221,014✔
252
    pWrapper = NULL;
46✔
253
    goto _OVER;
46✔
254
  }
255

256
  pRpc->info.wrapper = pWrapper;
220,968✔
257

258
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
220,968✔
259
  if (IsReq(pRpc) && pRpc->msgType != TDMT_SYNC_HEARTBEAT && pRpc->msgType != TDMT_SYNC_HEARTBEAT_REPLY)
220,968✔
260
    itype = RPC_QITEM;
210,375✔
261
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
220,968✔
262
  if (code) goto _OVER;
220,967!
263

264
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
220,967✔
265
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
220,967!
266
          pRpc->contLen);
267

268
  code = dmProcessNodeMsg(pWrapper, pMsg);
220,967✔
269

270
_OVER:
225,400✔
271
  if (code != 0) {
225,400✔
272
    code = dmConvertErrCode(pRpc->msgType, code);
4,468✔
273
    if (pMsg) {
4,468✔
274
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
29!
275
    } else {
276
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
4,439!
277
    }
278

279
    if (IsReq(pRpc)) {
4,468!
280
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
4,468✔
281
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
4,468✔
282
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
12✔
283
      }
284

285
      if (pWrapper != NULL) {
4,468✔
286
        dmSendRsp(&rsp);
29✔
287
      } else {
288
        if (rpcSendResponse(&rsp) != 0) {
4,439!
289
          dError("failed to send response, msg:%p", &rsp);
×
290
        }
291
      }
292
    }
293

294
    if (pMsg != NULL) {
4,470✔
295
      dGTrace("msg:%p, is freed", pMsg);
29!
296
      taosFreeQitem(pMsg);
29✔
297
    }
298
    rpcFreeCont(pRpc->pCont);
4,470✔
299
    pRpc->pCont = NULL;
4,471✔
300
  }
301

302
  dmReleaseWrapper(pWrapper);
225,403✔
303
}
304

305
int32_t dmInitMsgHandle(SDnode *pDnode) {
133✔
306
  SDnodeTrans *pTrans = &pDnode->trans;
133✔
307

308
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
798✔
309
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
665✔
310
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
665✔
311
    if (pArray == NULL) return -1;
665!
312

313
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
46,949✔
314
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
46,284✔
315
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
46,284✔
316
      if (pMgmt->needCheckVgId) {
46,284✔
317
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
7,980✔
318
      }
319
      if (!pMgmt->needCheckVgId) {
46,284✔
320
        pHandle->defaultNtype = ntype;
38,304✔
321
      }
322
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
46,284✔
323
    }
324

325
    taosArrayDestroy(pArray);
665✔
326
  }
327

328
  return 0;
133✔
329
}
330

331
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
10,713✔
332
  int32_t code = 0;
10,713✔
333
  SDnode *pDnode = dmInstance();
10,713✔
334
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
10,713!
335
    rpcFreeCont(pMsg->pCont);
×
336
    pMsg->pCont = NULL;
×
337
    if (pDnode->status == DND_STAT_INIT) {
×
338
      code = TSDB_CODE_APP_IS_STARTING;
×
339
    } else {
340
      code = TSDB_CODE_APP_IS_STOPPING;
×
341
    }
342
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
343
           pMsg->info.handle);
344
    return code;
×
345
  } else {
346
    pMsg->info.handle = 0;
10,713✔
347
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
10,713!
348
      dError("failed to send rpc msg");
×
349
    }
350
    return 0;
10,713✔
351
  }
352
}
353
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
120,459✔
354
  int32_t code = 0;
120,459✔
355
  SDnode *pDnode = dmInstance();
120,459✔
356
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
120,459!
357
    rpcFreeCont(pMsg->pCont);
×
358
    pMsg->pCont = NULL;
×
359
    if (pDnode->status == DND_STAT_INIT) {
×
360
      code = TSDB_CODE_APP_IS_STARTING;
×
361
    } else {
362
      code = TSDB_CODE_APP_IS_STOPPING;
×
363
    }
364
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
365
           pMsg->info.handle);
366
    return code;
×
367
  } else {
368
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
120,459✔
369
  }
370
}
371

372
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
4,052✔
373

374
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
3,914✔
375
  (void)rpcReleaseHandle(pHandle, type, status);
3,914✔
376
}
3,914✔
377

378
static bool rpcRfp(int32_t code, tmsg_t msgType) {
1,242✔
379
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
1,242!
380
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
1,056✔
381
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
289!
382
      code == TSDB_CODE_APP_IS_STOPPING) {
383
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
1,107!
384
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
1,107!
385
      return false;
×
386
    }
387
    return true;
1,107✔
388
  } else {
389
    return false;
135✔
390
  }
391
}
392
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
393
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
394
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
395
    return true;
×
396
  }
397
  return false;
×
398
}
399
int32_t dmInitClient(SDnode *pDnode) {
131✔
400
  SDnodeTrans *pTrans = &pDnode->trans;
131✔
401

402
  SRpcInit rpcInit = {0};
131✔
403
  rpcInit.label = "DNODE-CLI";
131✔
404
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
131✔
405
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
131✔
406
  rpcInit.sessions = 1024;
131✔
407
  rpcInit.connType = TAOS_CONN_CLIENT;
131✔
408
  rpcInit.user = TSDB_DEFAULT_USER;
131✔
409
  rpcInit.idleTime = tsShellActivityTimer * 1000;
131✔
410
  rpcInit.parent = pDnode;
131✔
411
  rpcInit.rfp = rpcRfp;
131✔
412
  rpcInit.compressSize = tsCompressMsgSize;
131✔
413
  rpcInit.dfp = destroyAhandle;
131✔
414

415
  rpcInit.retryMinInterval = tsRedirectPeriod;
131✔
416
  rpcInit.retryStepFactor = tsRedirectFactor;
131✔
417
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
131✔
418
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
131✔
419

420
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
131✔
421
  rpcInit.failFastThreshold = 3;    // failed threshold
131✔
422
  rpcInit.ffp = dmFailFastFp;
131✔
423

424
  rpcInit.noDelayFp = rpcNoDelayMsg;
131✔
425

426
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
131✔
427
  connLimitNum = TMAX(connLimitNum, 10);
131✔
428
  connLimitNum = TMIN(connLimitNum, 500);
131✔
429

430
  rpcInit.connLimitNum = connLimitNum;
131✔
431
  rpcInit.connLimitLock = 1;
131✔
432
  rpcInit.supportBatch = 1;
131✔
433
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
131✔
434
  rpcInit.shareConn = 1;
131✔
435
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
131✔
436
  rpcInit.notWaitAvaliableConn = 0;
131✔
437
  rpcInit.startReadTimer = 1;
131✔
438
  rpcInit.readTimeout = tsReadTimeout;
131✔
439

440
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
131!
441
    dError("failed to convert version string:%s to int", td_version);
×
442
  }
443

444
  pTrans->clientRpc = rpcOpen(&rpcInit);
131✔
445
  if (pTrans->clientRpc == NULL) {
131!
446
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
447
    return terrno;
×
448
  }
449

450
  dDebug("dnode rpc client is initialized");
131✔
451
  return 0;
131✔
452
}
453
int32_t dmInitStatusClient(SDnode *pDnode) {
131✔
454
  SDnodeTrans *pTrans = &pDnode->trans;
131✔
455

456
  SRpcInit rpcInit = {0};
131✔
457
  rpcInit.label = "DNODE-STA-CLI";
131✔
458
  rpcInit.numOfThreads = 1;
131✔
459
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
131✔
460
  rpcInit.sessions = 1024;
131✔
461
  rpcInit.connType = TAOS_CONN_CLIENT;
131✔
462
  rpcInit.user = TSDB_DEFAULT_USER;
131✔
463
  rpcInit.idleTime = tsShellActivityTimer * 1000;
131✔
464
  rpcInit.parent = pDnode;
131✔
465
  rpcInit.rfp = rpcRfp;
131✔
466
  rpcInit.compressSize = tsCompressMsgSize;
131✔
467

468
  rpcInit.retryMinInterval = tsRedirectPeriod;
131✔
469
  rpcInit.retryStepFactor = tsRedirectFactor;
131✔
470
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
131✔
471
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
131✔
472

473
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
131✔
474
  rpcInit.failFastThreshold = 3;    // failed threshold
131✔
475
  rpcInit.ffp = dmFailFastFp;
131✔
476

477
  int32_t connLimitNum = 100;
131✔
478
  connLimitNum = TMAX(connLimitNum, 10);
131✔
479
  connLimitNum = TMIN(connLimitNum, 500);
131✔
480

481
  rpcInit.connLimitNum = connLimitNum;
131✔
482
  rpcInit.connLimitLock = 1;
131✔
483
  rpcInit.supportBatch = 1;
131✔
484
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
131✔
485
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
131✔
486
  rpcInit.startReadTimer = 0;
131✔
487
  rpcInit.readTimeout = 0;
131✔
488

489
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
131!
490
    dError("failed to convert version string:%s to int", td_version);
×
491
  }
492

493
  pTrans->statusRpc = rpcOpen(&rpcInit);
131✔
494
  if (pTrans->statusRpc == NULL) {
131!
495
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
496
    return terrno;
×
497
  }
498

499
  dDebug("dnode rpc status client is initialized");
131✔
500
  return 0;
131✔
501
}
502

503
int32_t dmInitSyncClient(SDnode *pDnode) {
131✔
504
  SDnodeTrans *pTrans = &pDnode->trans;
131✔
505

506
  SRpcInit rpcInit = {0};
131✔
507
  rpcInit.label = "DNODE-SYNC-CLI";
131✔
508
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
131✔
509
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
131✔
510
  rpcInit.sessions = 1024;
131✔
511
  rpcInit.connType = TAOS_CONN_CLIENT;
131✔
512
  rpcInit.user = TSDB_DEFAULT_USER;
131✔
513
  rpcInit.idleTime = tsShellActivityTimer * 1000;
131✔
514
  rpcInit.parent = pDnode;
131✔
515
  rpcInit.rfp = rpcRfp;
131✔
516
  rpcInit.compressSize = tsCompressMsgSize;
131✔
517

518
  rpcInit.retryMinInterval = tsRedirectPeriod;
131✔
519
  rpcInit.retryStepFactor = tsRedirectFactor;
131✔
520
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
131✔
521
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
131✔
522

523
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
131✔
524
  rpcInit.failFastThreshold = 3;    // failed threshold
131✔
525
  rpcInit.ffp = dmFailFastFp;
131✔
526

527
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
131✔
528
  connLimitNum = TMAX(connLimitNum, 10);
131✔
529
  connLimitNum = TMIN(connLimitNum, 500);
131✔
530

531
  rpcInit.connLimitNum = connLimitNum;
131✔
532
  rpcInit.connLimitLock = 1;
131✔
533
  rpcInit.supportBatch = 1;
131✔
534
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
131✔
535
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
131✔
536
  rpcInit.startReadTimer = 1;
131✔
537
  rpcInit.readTimeout = tsReadTimeout;
131✔
538

539
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
131!
540
    dError("failed to convert version string:%s to int", td_version);
×
541
  }
542

543
  pTrans->syncRpc = rpcOpen(&rpcInit);
131✔
544
  if (pTrans->syncRpc == NULL) {
131!
545
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
546
    return terrno;
×
547
  }
548

549
  dDebug("dnode rpc sync client is initialized");
131✔
550
  return 0;
131✔
551
}
552

553
void dmCleanupClient(SDnode *pDnode) {
70✔
554
  SDnodeTrans *pTrans = &pDnode->trans;
70✔
555
  if (pTrans->clientRpc) {
70!
556
    rpcClose(pTrans->clientRpc);
70✔
557
    pTrans->clientRpc = NULL;
70✔
558
    dDebug("dnode rpc client is closed");
70✔
559
  }
560
}
70✔
561
void dmCleanupStatusClient(SDnode *pDnode) {
70✔
562
  SDnodeTrans *pTrans = &pDnode->trans;
70✔
563
  if (pTrans->statusRpc) {
70!
564
    rpcClose(pTrans->statusRpc);
70✔
565
    pTrans->statusRpc = NULL;
70✔
566
    dDebug("dnode rpc status client is closed");
70✔
567
  }
568
}
70✔
569
void dmCleanupSyncClient(SDnode *pDnode) {
70✔
570
  SDnodeTrans *pTrans = &pDnode->trans;
70✔
571
  if (pTrans->syncRpc) {
70!
572
    rpcClose(pTrans->syncRpc);
70✔
573
    pTrans->syncRpc = NULL;
70✔
574
    dDebug("dnode rpc sync client is closed");
70✔
575
  }
576
}
70✔
577

578
int32_t dmInitServer(SDnode *pDnode) {
133✔
579
  SDnodeTrans *pTrans = &pDnode->trans;
133✔
580

581
  SRpcInit rpcInit = {0};
133✔
582
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
133✔
583
  rpcInit.localPort = tsServerPort;
133✔
584
  rpcInit.label = "DND-S";
133✔
585
  rpcInit.numOfThreads = tsNumOfRpcThreads;
133✔
586
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
133✔
587
  rpcInit.sessions = tsMaxShellConns;
133✔
588
  rpcInit.connType = TAOS_CONN_SERVER;
133✔
589
  rpcInit.idleTime = tsShellActivityTimer * 1000;
133✔
590
  rpcInit.parent = pDnode;
133✔
591
  rpcInit.compressSize = tsCompressMsgSize;
133✔
592
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
133✔
593

594
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
133!
595
    dError("failed to convert version string:%s to int", td_version);
×
596
  }
597

598
  pTrans->serverRpc = rpcOpen(&rpcInit);
133✔
599
  if (pTrans->serverRpc == NULL) {
133✔
600
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
2!
601
    return terrno;
2✔
602
  }
603

604
  dDebug("dnode rpc server is initialized");
131✔
605
  return 0;
131✔
606
}
607

608
void dmCleanupServer(SDnode *pDnode) {
70✔
609
  SDnodeTrans *pTrans = &pDnode->trans;
70✔
610
  if (pTrans->serverRpc) {
70!
611
    rpcClose(pTrans->serverRpc);
70✔
612
    pTrans->serverRpc = NULL;
70✔
613
    dDebug("dnode rpc server is closed");
70✔
614
  }
615
}
70✔
616

617
SMsgCb dmGetMsgcb(SDnode *pDnode) {
962✔
618
  SMsgCb msgCb = {
962✔
619
      .clientRpc = pDnode->trans.clientRpc,
962✔
620
      .serverRpc = pDnode->trans.serverRpc,
962✔
621
      .statusRpc = pDnode->trans.statusRpc,
962✔
622
      .syncRpc = pDnode->trans.syncRpc,
962✔
623
      .sendReqFp = dmSendReq,
624
      .sendSyncReqFp = dmSendSyncReq,
625
      .sendRspFp = dmSendRsp,
626
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
627
      .releaseHandleFp = dmReleaseHandle,
628
      .reportStartupFp = dmReportStartup,
629
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
630
      .getDnodeEpFp = dmGetDnodeEp,
631
      .data = &pDnode->data,
962✔
632
  };
633
  return msgCb;
962✔
634
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc