• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4324

18 Jun 2025 07:25AM UTC coverage: 62.916% (-0.2%) from 63.116%
#4324

push

travis-ci

web-flow
docs: add IPv6 support information for taosAdapter (#31362)

158158 of 319881 branches covered (49.44%)

Branch coverage included in aggregate %.

243705 of 318846 relevant lines covered (76.43%)

17827866.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.44
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
19,058✔
23
  if (rpcSendResponse(pMsg) != 0) {
19,058!
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
19,054✔
27

28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
350✔
29
  SEpSet epSet = {0};
350✔
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
350✔
31

32
  if (epSet.numOfEps <= 1) {
356✔
33
    if (epSet.numOfEps == 0) {
192!
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
192!
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
356✔
48
  pMsg->pCont = rpcMallocCont(contLen);
342✔
49
  if (pMsg->pCont == NULL) {
352!
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
352✔
53
    if (contLen < 0) {
344!
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
57
    pMsg->contLen = contLen;
344✔
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
51,751,300✔
62
  const STraceId *trace = &pMsg->info.traceId;
51,751,300✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
51,751,300✔
65
  if (msgFp == NULL) {
51,751,300!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
51,751,300!
72
  pMsg->info.wrapper = pWrapper;
51,751,302✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
51,751,302✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
96,504✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
96,504✔
83
    return code;
23,008✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
73,496✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
20,786✔
87
    code = TSDB_CODE_VND_STOPPED;
9,302✔
88
  }
89
  return code;
73,496✔
90
}
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
6✔
92
  int32_t        code = 0;
6✔
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
6✔
94
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
6✔
95
  if (code < 0) {
6!
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
6✔
100
  pData->ipWhiteVer = ipWhite.ver;
6✔
101

102
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
6✔
103

104
  rpcFreeCont(pRpc->pCont);
6✔
105
}
106
static bool dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
57,550,131✔
107
  if (forbidden) {
57,550,131!
108
    dError("User:%s host:%s not in ip white list", user, IP_ADDR_STR(clientIp));
×
109
    return true;
×
110
  } else {
111
    return false;
57,550,131✔
112
  }
113
}
114

115
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
2✔
116
  SRetrieveAnalyticAlgoRsp rsp = {0};
2✔
117
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
2!
118
    taosAnalyUpdate(rsp.ver, rsp.hash);
2✔
119
    rsp.hash = NULL;
2✔
120
  }
121
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
2✔
122
  rpcFreeCont(pRpc->pCont);
2✔
123
}
2✔
124

125
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
57,566,813✔
126
  SDnodeTrans  *pTrans = &pDnode->trans;
57,566,813✔
127
  int32_t       code = -1;
57,566,813✔
128
  SRpcMsg      *pMsg = NULL;
57,566,813✔
129
  SMgmtWrapper *pWrapper = NULL;
57,566,813✔
130
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
57,566,813✔
131

132
  const STraceId *trace = &pRpc->info.traceId;
57,566,813✔
133
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
57,566,813!
134
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
135

136
  int32_t svrVer = 0;
57,566,815✔
137
  code = taosVersionStrToInt(td_version, &svrVer);
57,566,815✔
138
  if (code != 0) {
57,581,861!
139
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
140
    goto _OVER;
×
141
  }
142
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
57,581,861!
143
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
144
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
145
    goto _OVER;
×
146
  }
147

148
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, &pRpc->info.conn.cliAddr);
57,602,641✔
149
  if (isForbidden) {
57,560,344!
150
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
151
    goto _OVER;
×
152
  }
153

154
  switch (pRpc->msgType) {
57,560,344!
155
    case TDMT_DND_NET_TEST:
×
156
      dmProcessNetTestReq(pDnode, pRpc);
×
157
      return;
5,770,012✔
158
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
5,763,595✔
159
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
160
    case TDMT_SCH_FETCH_RSP:
161
    case TDMT_SCH_MERGE_FETCH_RSP:
162
    case TDMT_VND_SUBMIT_RSP:
163
    case TDMT_MND_GET_DB_INFO_RSP:
164
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
5,763,595✔
165
      return;
5,770,004✔
166
    case TDMT_MND_STATUS_RSP:
×
167
      if (pEpSet != NULL) {
×
168
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
169
      }
170
      break;
×
171
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
172
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
×
173
      return;
×
174
    case TDMT_MND_RETRIEVE_IP_WHITE_DUAL_RSP:
6✔
175
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
6✔
176
      return;
6✔
177
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
2✔
178
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
2✔
179
      return;
2✔
180
    default:
51,796,741✔
181
      break;
51,796,741✔
182
  }
183

184
  /*
185
  pDnode is null, TD-22618
186
  at trans.c line 91
187
  before this line, dmProcessRpcMsg callback is set
188
  after this line, parent is set
189
  so when dmProcessRpcMsg is called, pDonde is still null.
190
  */
191
  if (pDnode != NULL) {
51,796,741✔
192
    if (pDnode->status != DND_STAT_RUNNING) {
51,790,238✔
193
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
77,004!
194
        dmProcessServerStartupStatus(pDnode, pRpc);
×
195
        return;
×
196
      } else {
197
        if (pDnode->status == DND_STAT_INIT) {
77,004✔
198
          code = TSDB_CODE_APP_IS_STARTING;
3,504✔
199
        } else {
200
          code = TSDB_CODE_APP_IS_STOPPING;
73,500✔
201
        }
202
        goto _OVER;
77,004✔
203
      }
204
    }
205
  } else {
206
    code = TSDB_CODE_APP_IS_STARTING;
6,503✔
207
    goto _OVER;
6,503✔
208
  }
209

210
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
51,713,234!
211
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
212
    code = TSDB_CODE_INVALID_MSG_LEN;
×
213
    goto _OVER;
×
214
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
51,713,234!
215
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
16,016!
216
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
10!
217
  }
218

219
  if (pHandle->defaultNtype == NODE_END) {
51,713,234!
220
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
221
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
222
    goto _OVER;
×
223
  }
224

225
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
51,713,234✔
226
  if (pHandle->needCheckVgId) {
51,713,234✔
227
    if (pRpc->contLen > 0) {
36,578,632!
228
      const SMsgHead *pHead = pRpc->pCont;
36,610,463✔
229
      const int32_t   vgId = ntohl(pHead->vgId);
36,610,463✔
230
      switch (vgId) {
36,610,463✔
231
        case QNODE_HANDLE:
1,021,146✔
232
          pWrapper = &pDnode->wrappers[QNODE];
1,021,146✔
233
          break;
1,021,146✔
234
        case SNODE_HANDLE:
9,301✔
235
          pWrapper = &pDnode->wrappers[SNODE];
9,301✔
236
          break;
9,301✔
237
        case MNODE_HANDLE:
935,307✔
238
          pWrapper = &pDnode->wrappers[MNODE];
935,307✔
239
          break;
935,307✔
240
        default:
34,644,709✔
241
          break;
34,644,709✔
242
      }
243
    } else {
244
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
245
      code = TSDB_CODE_INVALID_MSG_LEN;
×
246
      goto _OVER;
×
247
    }
248
  }
249

250
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
51,745,065✔
251
    pWrapper = NULL;
390✔
252
    goto _OVER;
390✔
253
  }
254

255
  pRpc->info.wrapper = pWrapper;
51,761,966✔
256

257
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
51,761,966✔
258
  if (IsReq(pRpc) && pRpc->msgType != TDMT_SYNC_HEARTBEAT && pRpc->msgType != TDMT_SYNC_HEARTBEAT_REPLY)
51,761,966✔
259
    itype = RPC_QITEM;
51,460,161✔
260
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
51,761,966✔
261
  if (code) goto _OVER;
51,814,473!
262

263
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
51,814,473✔
264
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
51,814,473!
265
          pRpc->contLen);
266

267
  code = dmProcessNodeMsg(pWrapper, pMsg);
51,814,477✔
268

269
_OVER:
51,883,291✔
270
  if (code != 0) {
51,883,291✔
271
    code = dmConvertErrCode(pRpc->msgType, code);
96,509✔
272
    if (pMsg) {
96,509✔
273
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
19,068!
274
    } else {
275
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
77,441!
276
    }
277

278
    if (IsReq(pRpc)) {
96,509✔
279
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
96,334✔
280
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
96,334✔
281
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
352✔
282
      }
283

284
      if (pWrapper != NULL) {
96,331✔
285
        dmSendRsp(&rsp);
19,060✔
286
      } else {
287
        if (rpcSendResponse(&rsp) != 0) {
77,271!
288
          dError("failed to send response, msg:%p", &rsp);
×
289
        }
290
      }
291
    }
292

293
    if (pMsg != NULL) {
96,501✔
294
      dGTrace("msg:%p, is freed", pMsg);
19,059!
295
      taosFreeQitem(pMsg);
19,059✔
296
    }
297
    rpcFreeCont(pRpc->pCont);
96,506✔
298
    pRpc->pCont = NULL;
96,503✔
299
  }
300

301
  dmReleaseWrapper(pWrapper);
51,883,285✔
302
}
303

304
int32_t dmInitMsgHandle(SDnode *pDnode) {
2,694✔
305
  SDnodeTrans *pTrans = &pDnode->trans;
2,694✔
306

307
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
16,164✔
308
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
13,470✔
309
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
13,470✔
310
    if (pArray == NULL) return -1;
13,470!
311

312
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
959,064✔
313
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
945,594✔
314
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
945,594✔
315
      if (pMgmt->needCheckVgId) {
945,594✔
316
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
161,640✔
317
      }
318
      if (!pMgmt->needCheckVgId) {
945,594✔
319
        pHandle->defaultNtype = ntype;
783,954✔
320
      }
321
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
945,594✔
322
    }
323

324
    taosArrayDestroy(pArray);
13,470✔
325
  }
326

327
  return 0;
2,694✔
328
}
329

330
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
473,144✔
331
  int32_t code = 0;
473,144✔
332
  SDnode *pDnode = dmInstance();
473,144✔
333
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
473,146✔
334
    rpcFreeCont(pMsg->pCont);
646✔
335
    pMsg->pCont = NULL;
646✔
336
    if (pDnode->status == DND_STAT_INIT) {
646!
337
      code = TSDB_CODE_APP_IS_STARTING;
×
338
    } else {
339
      code = TSDB_CODE_APP_IS_STOPPING;
646✔
340
    }
341
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
646!
342
           pMsg->info.handle);
343
    return code;
646✔
344
  } else {
345
    pMsg->info.handle = 0;
472,500✔
346
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
472,500!
347
      dError("failed to send rpc msg");
×
348
    }
349
    return 0;
472,520✔
350
  }
351
}
352
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
5,944,085✔
353
  int32_t code = 0;
5,944,085✔
354
  SDnode *pDnode = dmInstance();
5,944,085✔
355
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
5,944,102!
356
    rpcFreeCont(pMsg->pCont);
×
357
    pMsg->pCont = NULL;
×
358
    if (pDnode->status == DND_STAT_INIT) {
×
359
      code = TSDB_CODE_APP_IS_STARTING;
×
360
    } else {
361
      code = TSDB_CODE_APP_IS_STOPPING;
×
362
    }
363
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
364
           pMsg->info.handle);
365
    return code;
×
366
  } else {
367
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
5,944,102✔
368
  }
369
}
370

371
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
11,000,054✔
372

373
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
10,991,075✔
374
  (void)rpcReleaseHandle(pHandle, type, status);
10,991,075✔
375
}
11,004,281✔
376

377
static bool rpcRfp(int32_t code, tmsg_t msgType) {
55,679✔
378
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
55,679!
379
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
46,347✔
380
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
13,577✔
381
      code == TSDB_CODE_APP_IS_STOPPING) {
382
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
50,234!
383
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
50,235!
384
      return false;
×
385
    }
386
    return true;
50,235✔
387
  } else {
388
    return false;
5,445✔
389
  }
390
}
391
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
392
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
393
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
394
    return true;
×
395
  }
396
  return false;
×
397
}
398
int32_t dmInitClient(SDnode *pDnode) {
2,691✔
399
  SDnodeTrans *pTrans = &pDnode->trans;
2,691✔
400

401
  SRpcInit rpcInit = {0};
2,691✔
402
  rpcInit.label = "DNODE-CLI";
2,691✔
403
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,691✔
404
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,691✔
405
  rpcInit.sessions = 1024;
2,691✔
406
  rpcInit.connType = TAOS_CONN_CLIENT;
2,691✔
407
  rpcInit.user = TSDB_DEFAULT_USER;
2,691✔
408
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,691✔
409
  rpcInit.parent = pDnode;
2,691✔
410
  rpcInit.rfp = rpcRfp;
2,691✔
411
  rpcInit.compressSize = tsCompressMsgSize;
2,691✔
412
  rpcInit.dfp = destroyAhandle;
2,691✔
413

414
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,691✔
415
  rpcInit.retryStepFactor = tsRedirectFactor;
2,691✔
416
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,691✔
417
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,691✔
418

419
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,691✔
420
  rpcInit.failFastThreshold = 3;    // failed threshold
2,691✔
421
  rpcInit.ffp = dmFailFastFp;
2,691✔
422

423
  rpcInit.noDelayFp = rpcNoDelayMsg;
2,691✔
424

425
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,691✔
426
  connLimitNum = TMAX(connLimitNum, 10);
2,691✔
427
  connLimitNum = TMIN(connLimitNum, 500);
2,691✔
428

429
  rpcInit.connLimitNum = connLimitNum;
2,691✔
430
  rpcInit.connLimitLock = 1;
2,691✔
431
  rpcInit.supportBatch = 1;
2,691✔
432
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,691✔
433
  rpcInit.shareConn = 1;
2,691✔
434
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,691✔
435
  rpcInit.notWaitAvaliableConn = 0;
2,691✔
436
  rpcInit.startReadTimer = 1;
2,691✔
437
  rpcInit.readTimeout = tsReadTimeout;
2,691✔
438
  rpcInit.ipv6 = tsEnableIpv6;
2,691✔
439

440
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,691!
441
    dError("failed to convert version string:%s to int", td_version);
×
442
  }
443

444
  pTrans->clientRpc = rpcOpen(&rpcInit);
2,691✔
445
  if (pTrans->clientRpc == NULL) {
2,691!
446
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
447
    return terrno;
×
448
  }
449

450
  dDebug("dnode rpc client is initialized");
2,691✔
451
  return 0;
2,691✔
452
}
453
int32_t dmInitStatusClient(SDnode *pDnode) {
2,691✔
454
  SDnodeTrans *pTrans = &pDnode->trans;
2,691✔
455

456
  SRpcInit rpcInit = {0};
2,691✔
457
  rpcInit.label = "DNODE-STA-CLI";
2,691✔
458
  rpcInit.numOfThreads = 1;
2,691✔
459
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,691✔
460
  rpcInit.sessions = 1024;
2,691✔
461
  rpcInit.connType = TAOS_CONN_CLIENT;
2,691✔
462
  rpcInit.user = TSDB_DEFAULT_USER;
2,691✔
463
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,691✔
464
  rpcInit.parent = pDnode;
2,691✔
465
  rpcInit.rfp = rpcRfp;
2,691✔
466
  rpcInit.compressSize = tsCompressMsgSize;
2,691✔
467

468
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,691✔
469
  rpcInit.retryStepFactor = tsRedirectFactor;
2,691✔
470
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,691✔
471
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,691✔
472

473
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,691✔
474
  rpcInit.failFastThreshold = 3;    // failed threshold
2,691✔
475
  rpcInit.ffp = dmFailFastFp;
2,691✔
476

477
  int32_t connLimitNum = 100;
2,691✔
478
  connLimitNum = TMAX(connLimitNum, 10);
2,691✔
479
  connLimitNum = TMIN(connLimitNum, 500);
2,691✔
480

481
  rpcInit.connLimitNum = connLimitNum;
2,691✔
482
  rpcInit.connLimitLock = 1;
2,691✔
483
  rpcInit.supportBatch = 1;
2,691✔
484
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,691✔
485
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,691✔
486
  rpcInit.startReadTimer = 0;
2,691✔
487
  rpcInit.readTimeout = 0;
2,691✔
488
  rpcInit.ipv6 = tsEnableIpv6;
2,691✔
489

490
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,691!
491
    dError("failed to convert version string:%s to int", td_version);
×
492
  }
493

494
  pTrans->statusRpc = rpcOpen(&rpcInit);
2,691✔
495
  if (pTrans->statusRpc == NULL) {
2,691!
496
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
497
    return terrno;
×
498
  }
499

500
  dDebug("dnode rpc status client is initialized");
2,691✔
501
  return 0;
2,691✔
502
}
503

504
int32_t dmInitSyncClient(SDnode *pDnode) {
2,691✔
505
  SDnodeTrans *pTrans = &pDnode->trans;
2,691✔
506

507
  SRpcInit rpcInit = {0};
2,691✔
508
  rpcInit.label = "DNODE-SYNC-CLI";
2,691✔
509
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,691✔
510
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,691✔
511
  rpcInit.sessions = 1024;
2,691✔
512
  rpcInit.connType = TAOS_CONN_CLIENT;
2,691✔
513
  rpcInit.user = TSDB_DEFAULT_USER;
2,691✔
514
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,691✔
515
  rpcInit.parent = pDnode;
2,691✔
516
  rpcInit.rfp = rpcRfp;
2,691✔
517
  rpcInit.compressSize = tsCompressMsgSize;
2,691✔
518

519
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,691✔
520
  rpcInit.retryStepFactor = tsRedirectFactor;
2,691✔
521
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,691✔
522
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,691✔
523

524
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,691✔
525
  rpcInit.failFastThreshold = 3;    // failed threshold
2,691✔
526
  rpcInit.ffp = dmFailFastFp;
2,691✔
527

528
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
2,691✔
529
  connLimitNum = TMAX(connLimitNum, 10);
2,691✔
530
  connLimitNum = TMIN(connLimitNum, 500);
2,691✔
531

532
  rpcInit.connLimitNum = connLimitNum;
2,691✔
533
  rpcInit.connLimitLock = 1;
2,691✔
534
  rpcInit.supportBatch = 1;
2,691✔
535
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
2,691✔
536
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,691✔
537
  rpcInit.startReadTimer = 1;
2,691✔
538
  rpcInit.readTimeout = tsReadTimeout;
2,691✔
539
  rpcInit.ipv6 = tsEnableIpv6;
2,691✔
540

541
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,691!
542
    dError("failed to convert version string:%s to int", td_version);
×
543
  }
544

545
  pTrans->syncRpc = rpcOpen(&rpcInit);
2,691✔
546
  if (pTrans->syncRpc == NULL) {
2,691!
547
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
548
    return terrno;
×
549
  }
550

551
  dDebug("dnode rpc sync client is initialized");
2,691✔
552
  return 0;
2,691✔
553
}
554

555
void dmCleanupClient(SDnode *pDnode) {
2,691✔
556
  SDnodeTrans *pTrans = &pDnode->trans;
2,691✔
557
  if (pTrans->clientRpc) {
2,691!
558
    rpcClose(pTrans->clientRpc);
2,691✔
559
    pTrans->clientRpc = NULL;
2,691✔
560
    dDebug("dnode rpc client is closed");
2,691✔
561
  }
562
}
2,691✔
563
void dmCleanupStatusClient(SDnode *pDnode) {
2,691✔
564
  SDnodeTrans *pTrans = &pDnode->trans;
2,691✔
565
  if (pTrans->statusRpc) {
2,691!
566
    rpcClose(pTrans->statusRpc);
2,691✔
567
    pTrans->statusRpc = NULL;
2,691✔
568
    dDebug("dnode rpc status client is closed");
2,691✔
569
  }
570
}
2,691✔
571
void dmCleanupSyncClient(SDnode *pDnode) {
2,691✔
572
  SDnodeTrans *pTrans = &pDnode->trans;
2,691✔
573
  if (pTrans->syncRpc) {
2,691!
574
    rpcClose(pTrans->syncRpc);
2,691✔
575
    pTrans->syncRpc = NULL;
2,691✔
576
    dDebug("dnode rpc sync client is closed");
2,691✔
577
  }
578
}
2,691✔
579

580
int32_t dmInitServer(SDnode *pDnode) {
2,694✔
581
  int32_t      code = 0;
2,694✔
582
  SDnodeTrans *pTrans = &pDnode->trans;
2,694✔
583

584
  SRpcInit rpcInit = {0};
2,694✔
585
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
2,694✔
586

587
  rpcInit.localPort = tsServerPort;
2,694✔
588
  rpcInit.label = "DND-S";
2,694✔
589
  rpcInit.numOfThreads = tsNumOfRpcThreads;
2,694✔
590
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,694✔
591
  rpcInit.sessions = tsMaxShellConns;
2,694✔
592
  rpcInit.connType = TAOS_CONN_SERVER;
2,694✔
593
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,694✔
594
  rpcInit.parent = pDnode;
2,694✔
595
  rpcInit.compressSize = tsCompressMsgSize;
2,694✔
596
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
2,694✔
597
  rpcInit.ipv6 = tsEnableIpv6;
2,694✔
598

599
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,694!
600
    dError("failed to convert version string:%s to int", td_version);
×
601
  }
602

603
  pTrans->serverRpc = rpcOpen(&rpcInit);
2,694✔
604
  if (pTrans->serverRpc == NULL) {
2,694✔
605
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
3!
606
    return terrno;
3✔
607
  }
608

609
  dDebug("dnode rpc server is initialized");
2,691✔
610
  return 0;
2,691✔
611
}
612

613
void dmCleanupServer(SDnode *pDnode) {
2,691✔
614
  SDnodeTrans *pTrans = &pDnode->trans;
2,691✔
615
  if (pTrans->serverRpc) {
2,691!
616
    rpcClose(pTrans->serverRpc);
2,691✔
617
    pTrans->serverRpc = NULL;
2,691✔
618
    dDebug("dnode rpc server is closed");
2,691✔
619
  }
620
}
2,691✔
621

622
SMsgCb dmGetMsgcb(SDnode *pDnode) {
22,359✔
623
  SMsgCb msgCb = {
22,359✔
624
      .clientRpc = pDnode->trans.clientRpc,
22,359✔
625
      .serverRpc = pDnode->trans.serverRpc,
22,359✔
626
      .statusRpc = pDnode->trans.statusRpc,
22,359✔
627
      .syncRpc = pDnode->trans.syncRpc,
22,359✔
628
      .sendReqFp = dmSendReq,
629
      .sendSyncReqFp = dmSendSyncReq,
630
      .sendRspFp = dmSendRsp,
631
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
632
      .releaseHandleFp = dmReleaseHandle,
633
      .reportStartupFp = dmReportStartup,
634
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
635
      .getDnodeEpFp = dmGetDnodeEp,
636
      .data = &pDnode->data,
22,359✔
637
  };
638
  return msgCb;
22,359✔
639
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc