• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4271

10 Jun 2025 09:45AM UTC coverage: 62.985% (+0.002%) from 62.983%
#4271

push

travis-ci

web-flow
Merge pull request #31337 from taosdata/newtest_3.0

fix TD-35057 and TD-35346

158179 of 319671 branches covered (49.48%)

Branch coverage included in aggregate %.

243860 of 318637 relevant lines covered (76.53%)

18624660.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.68
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
19,172✔
23
  if (rpcSendResponse(pMsg) != 0) {
19,172!
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
19,177✔
27

28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
338✔
29
  SEpSet epSet = {0};
338✔
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
338✔
31

32
  if (epSet.numOfEps <= 1) {
341✔
33
    if (epSet.numOfEps == 0) {
173!
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
173!
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
341✔
48
  pMsg->pCont = rpcMallocCont(contLen);
337✔
49
  if (pMsg->pCont == NULL) {
338!
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
338✔
53
    if (contLen < 0) {
342!
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
57
    pMsg->contLen = contLen;
342✔
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
52,098,540✔
62
  const STraceId *trace = &pMsg->info.traceId;
52,098,540✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
52,098,540✔
65
  if (msgFp == NULL) {
52,098,540!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
52,098,540!
72
  pMsg->info.wrapper = pWrapper;
52,098,537✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
52,098,537✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
99,320✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
99,320✔
83
    return code;
23,196✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
76,124✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
23,208✔
87
    code = TSDB_CODE_VND_STOPPED;
10,090✔
88
  }
89
  return code;
76,124✔
90
}
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
6✔
92
  int32_t        code = 0;
6✔
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
6✔
94
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
6✔
95
  if (code < 0) {
6!
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
6✔
100
  pData->ipWhiteVer = ipWhite.ver;
6✔
101

102
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
6✔
103

104
  rpcFreeCont(pRpc->pCont);
6✔
105
}
106
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
57,895,656✔
107
  if (forbidden) {
57,895,656!
108
    SIpV4Range range = {.ip = clientIp, .mask = 32};
×
109
    char       buf[36] = {0};
×
110

111
    (void)rpcUtilSIpRangeToStr(&range, buf);
×
112
    dError("User:%s host:%s not in ip white list", user, buf);
×
113
    return true;
×
114
  } else {
115
    return false;
57,895,656✔
116
  }
117
}
118

119
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
2✔
120
  SRetrieveAnalyticAlgoRsp rsp = {0};
2✔
121
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
2!
122
    taosAnalyUpdate(rsp.ver, rsp.hash);
2✔
123
    rsp.hash = NULL;
2✔
124
  }
125
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
2✔
126
  rpcFreeCont(pRpc->pCont);
2✔
127
}
2✔
128

129
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
57,920,754✔
130
  SDnodeTrans  *pTrans = &pDnode->trans;
57,920,754✔
131
  int32_t       code = -1;
57,920,754✔
132
  SRpcMsg      *pMsg = NULL;
57,920,754✔
133
  SMgmtWrapper *pWrapper = NULL;
57,920,754✔
134
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
57,920,754✔
135

136
  const STraceId *trace = &pRpc->info.traceId;
57,920,754✔
137
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
57,920,754!
138
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
139

140
  int32_t svrVer = 0;
57,920,755✔
141
  code = taosVersionStrToInt(td_version, &svrVer);
57,920,755✔
142
  if (code != 0) {
57,941,083!
143
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
144
    goto _OVER;
×
145
  }
146
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
57,941,083!
147
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
148
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
149
    goto _OVER;
×
150
  }
151

152
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, 0);
57,946,977✔
153
  if (isForbidden) {
57,900,749!
154
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
155
    goto _OVER;
×
156
  }
157

158
  switch (pRpc->msgType) {
57,900,749!
159
    case TDMT_DND_NET_TEST:
×
160
      dmProcessNetTestReq(pDnode, pRpc);
×
161
      return;
5,777,399✔
162
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
5,769,037✔
163
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
164
    case TDMT_SCH_FETCH_RSP:
165
    case TDMT_SCH_MERGE_FETCH_RSP:
166
    case TDMT_VND_SUBMIT_RSP:
167
    case TDMT_MND_GET_DB_INFO_RSP:
168
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
5,769,037✔
169
      return;
5,777,391✔
170
    case TDMT_MND_STATUS_RSP:
×
171
      if (pEpSet != NULL) {
×
172
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
173
      }
174
      break;
×
175
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
176
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
×
177
      return;
×
178
    case TDMT_MND_RETRIEVE_IP_WHITE_DUAL_RSP:
6✔
179
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
6✔
180
      return;
6✔
181
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
2✔
182
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
2✔
183
      return;
2✔
184
    default:
52,131,704✔
185
      break;
52,131,704✔
186
  }
187

188
  /*
189
  pDnode is null, TD-22618
190
  at trans.c line 91
191
  before this line, dmProcessRpcMsg callback is set
192
  after this line, parent is set
193
  so when dmProcessRpcMsg is called, pDonde is still null.
194
  */
195
  if (pDnode != NULL) {
52,131,704✔
196
    if (pDnode->status != DND_STAT_RUNNING) {
52,129,981✔
197
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
79,695!
198
        dmProcessServerStartupStatus(pDnode, pRpc);
×
199
        return;
×
200
      } else {
201
        if (pDnode->status == DND_STAT_INIT) {
79,695✔
202
          code = TSDB_CODE_APP_IS_STARTING;
3,570✔
203
        } else {
204
          code = TSDB_CODE_APP_IS_STOPPING;
76,125✔
205
        }
206
        goto _OVER;
79,695✔
207
      }
208
    }
209
  } else {
210
    code = TSDB_CODE_APP_IS_STARTING;
1,723✔
211
    goto _OVER;
1,723✔
212
  }
213

214
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
52,050,286!
215
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
216
    code = TSDB_CODE_INVALID_MSG_LEN;
×
217
    goto _OVER;
×
218
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
52,050,286!
219
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
13,884!
220
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
14!
221
  }
222

223
  if (pHandle->defaultNtype == NODE_END) {
52,050,286!
224
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
225
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
226
    goto _OVER;
×
227
  }
228

229
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
52,050,286✔
230
  if (pHandle->needCheckVgId) {
52,050,286✔
231
    if (pRpc->contLen > 0) {
36,744,240!
232
      const SMsgHead *pHead = pRpc->pCont;
36,770,981✔
233
      const int32_t   vgId = ntohl(pHead->vgId);
36,770,981✔
234
      switch (vgId) {
36,770,981✔
235
        case QNODE_HANDLE:
1,068,518✔
236
          pWrapper = &pDnode->wrappers[QNODE];
1,068,518✔
237
          break;
1,068,518✔
238
        case SNODE_HANDLE:
8,073✔
239
          pWrapper = &pDnode->wrappers[SNODE];
8,073✔
240
          break;
8,073✔
241
        case MNODE_HANDLE:
934,408✔
242
          pWrapper = &pDnode->wrappers[MNODE];
934,408✔
243
          break;
934,408✔
244
        default:
34,759,982✔
245
          break;
34,759,982✔
246
      }
247
    } else {
248
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
249
      code = TSDB_CODE_INVALID_MSG_LEN;
×
250
      goto _OVER;
×
251
    }
252
  }
253

254
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
52,077,027✔
255
    pWrapper = NULL;
380✔
256
    goto _OVER;
380✔
257
  }
258

259
  pRpc->info.wrapper = pWrapper;
52,094,342✔
260

261
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
52,094,342✔
262
  if (IsReq(pRpc) && pRpc->msgType != TDMT_SYNC_HEARTBEAT && pRpc->msgType != TDMT_SYNC_HEARTBEAT_REPLY)
52,094,342✔
263
    itype = RPC_QITEM;
51,819,631✔
264
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
52,094,342✔
265
  if (code) goto _OVER;
52,150,081!
266

267
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
52,150,081✔
268
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
52,150,081!
269
          pRpc->contLen);
270

271
  code = dmProcessNodeMsg(pWrapper, pMsg);
52,150,087✔
272

273
_OVER:
52,225,770✔
274
  if (code != 0) {
52,225,770✔
275
    code = dmConvertErrCode(pRpc->msgType, code);
99,325✔
276
    if (pMsg) {
99,322✔
277
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
19,202!
278
    } else {
279
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
80,120!
280
    }
281

282
    if (IsReq(pRpc)) {
99,322✔
283
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
99,126✔
284
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
99,126✔
285
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
338✔
286
      }
287

288
      if (pWrapper != NULL) {
99,118✔
289
        dmSendRsp(&rsp);
19,173✔
290
      } else {
291
        if (rpcSendResponse(&rsp) != 0) {
79,945!
292
          dError("failed to send response, msg:%p", &rsp);
×
293
        }
294
      }
295
    }
296

297
    if (pMsg != NULL) {
99,322✔
298
      dGTrace("msg:%p, is freed", pMsg);
19,191!
299
      taosFreeQitem(pMsg);
19,191✔
300
    }
301
    rpcFreeCont(pRpc->pCont);
99,326✔
302
    pRpc->pCont = NULL;
99,328✔
303
  }
304

305
  dmReleaseWrapper(pWrapper);
52,225,773✔
306
}
307

308
int32_t dmInitMsgHandle(SDnode *pDnode) {
2,800✔
309
  SDnodeTrans *pTrans = &pDnode->trans;
2,800✔
310

311
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
16,800✔
312
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
14,000✔
313
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
14,000✔
314
    if (pArray == NULL) return -1;
14,000!
315

316
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
996,800✔
317
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
982,800✔
318
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
982,800✔
319
      if (pMgmt->needCheckVgId) {
982,800✔
320
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
168,000✔
321
      }
322
      if (!pMgmt->needCheckVgId) {
982,800✔
323
        pHandle->defaultNtype = ntype;
814,800✔
324
      }
325
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
982,800✔
326
    }
327

328
    taosArrayDestroy(pArray);
14,000✔
329
  }
330

331
  return 0;
2,800✔
332
}
333

334
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
456,054✔
335
  int32_t code = 0;
456,054✔
336
  SDnode *pDnode = dmInstance();
456,054✔
337
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
456,057✔
338
    rpcFreeCont(pMsg->pCont);
667✔
339
    pMsg->pCont = NULL;
667✔
340
    if (pDnode->status == DND_STAT_INIT) {
667!
341
      code = TSDB_CODE_APP_IS_STARTING;
×
342
    } else {
343
      code = TSDB_CODE_APP_IS_STOPPING;
667✔
344
    }
345
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
667!
346
           pMsg->info.handle);
347
    return code;
667✔
348
  } else {
349
    pMsg->info.handle = 0;
455,390✔
350
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
455,390!
351
      dError("failed to send rpc msg");
×
352
    }
353
    return 0;
455,396✔
354
  }
355
}
356
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
5,848,975✔
357
  int32_t code = 0;
5,848,975✔
358
  SDnode *pDnode = dmInstance();
5,848,975✔
359
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
5,848,984!
360
    rpcFreeCont(pMsg->pCont);
×
361
    pMsg->pCont = NULL;
×
362
    if (pDnode->status == DND_STAT_INIT) {
×
363
      code = TSDB_CODE_APP_IS_STARTING;
×
364
    } else {
365
      code = TSDB_CODE_APP_IS_STOPPING;
×
366
    }
367
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
368
           pMsg->info.handle);
369
    return code;
×
370
  } else {
371
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
5,848,984✔
372
  }
373
}
374

375
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
11,139,096✔
376

377
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
11,129,000✔
378
  (void)rpcReleaseHandle(pHandle, type, status);
11,129,000✔
379
}
11,141,634✔
380

381
static bool rpcRfp(int32_t code, tmsg_t msgType) {
54,924✔
382
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
54,924!
383
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
46,059✔
384
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
13,800✔
385
      code == TSDB_CODE_APP_IS_STOPPING) {
386
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
49,457!
387
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
49,456!
388
      return false;
1✔
389
    }
390
    return true;
49,456✔
391
  } else {
392
    return false;
5,467✔
393
  }
394
}
395
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
396
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
397
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
398
    return true;
×
399
  }
400
  return false;
×
401
}
402
int32_t dmInitClient(SDnode *pDnode) {
2,798✔
403
  SDnodeTrans *pTrans = &pDnode->trans;
2,798✔
404

405
  SRpcInit rpcInit = {0};
2,798✔
406
  rpcInit.label = "DNODE-CLI";
2,798✔
407
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,798✔
408
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,798✔
409
  rpcInit.sessions = 1024;
2,798✔
410
  rpcInit.connType = TAOS_CONN_CLIENT;
2,798✔
411
  rpcInit.user = TSDB_DEFAULT_USER;
2,798✔
412
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,798✔
413
  rpcInit.parent = pDnode;
2,798✔
414
  rpcInit.rfp = rpcRfp;
2,798✔
415
  rpcInit.compressSize = tsCompressMsgSize;
2,798✔
416
  rpcInit.dfp = destroyAhandle;
2,798✔
417

418
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,798✔
419
  rpcInit.retryStepFactor = tsRedirectFactor;
2,798✔
420
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,798✔
421
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,798✔
422

423
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,798✔
424
  rpcInit.failFastThreshold = 3;    // failed threshold
2,798✔
425
  rpcInit.ffp = dmFailFastFp;
2,798✔
426

427
  rpcInit.noDelayFp = rpcNoDelayMsg;
2,798✔
428

429
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,798✔
430
  connLimitNum = TMAX(connLimitNum, 10);
2,798✔
431
  connLimitNum = TMIN(connLimitNum, 500);
2,798✔
432

433
  rpcInit.connLimitNum = connLimitNum;
2,798✔
434
  rpcInit.connLimitLock = 1;
2,798✔
435
  rpcInit.supportBatch = 1;
2,798✔
436
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,798✔
437
  rpcInit.shareConn = 1;
2,798✔
438
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,798✔
439
  rpcInit.notWaitAvaliableConn = 0;
2,798✔
440
  rpcInit.startReadTimer = 1;
2,798✔
441
  rpcInit.readTimeout = tsReadTimeout;
2,798✔
442
  rpcInit.ipv6 = tsEnableIpv6;
2,798✔
443

444
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,798!
445
    dError("failed to convert version string:%s to int", td_version);
×
446
  }
447

448
  pTrans->clientRpc = rpcOpen(&rpcInit);
2,798✔
449
  if (pTrans->clientRpc == NULL) {
2,798!
450
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
451
    return terrno;
×
452
  }
453

454
  dDebug("dnode rpc client is initialized");
2,798✔
455
  return 0;
2,798✔
456
}
457
int32_t dmInitStatusClient(SDnode *pDnode) {
2,798✔
458
  SDnodeTrans *pTrans = &pDnode->trans;
2,798✔
459

460
  SRpcInit rpcInit = {0};
2,798✔
461
  rpcInit.label = "DNODE-STA-CLI";
2,798✔
462
  rpcInit.numOfThreads = 1;
2,798✔
463
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,798✔
464
  rpcInit.sessions = 1024;
2,798✔
465
  rpcInit.connType = TAOS_CONN_CLIENT;
2,798✔
466
  rpcInit.user = TSDB_DEFAULT_USER;
2,798✔
467
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,798✔
468
  rpcInit.parent = pDnode;
2,798✔
469
  rpcInit.rfp = rpcRfp;
2,798✔
470
  rpcInit.compressSize = tsCompressMsgSize;
2,798✔
471

472
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,798✔
473
  rpcInit.retryStepFactor = tsRedirectFactor;
2,798✔
474
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,798✔
475
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,798✔
476

477
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,798✔
478
  rpcInit.failFastThreshold = 3;    // failed threshold
2,798✔
479
  rpcInit.ffp = dmFailFastFp;
2,798✔
480

481
  int32_t connLimitNum = 100;
2,798✔
482
  connLimitNum = TMAX(connLimitNum, 10);
2,798✔
483
  connLimitNum = TMIN(connLimitNum, 500);
2,798✔
484

485
  rpcInit.connLimitNum = connLimitNum;
2,798✔
486
  rpcInit.connLimitLock = 1;
2,798✔
487
  rpcInit.supportBatch = 1;
2,798✔
488
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,798✔
489
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,798✔
490
  rpcInit.startReadTimer = 0;
2,798✔
491
  rpcInit.readTimeout = 0;
2,798✔
492
  rpcInit.ipv6 = tsEnableIpv6;
2,798✔
493

494
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,798!
495
    dError("failed to convert version string:%s to int", td_version);
×
496
  }
497

498
  pTrans->statusRpc = rpcOpen(&rpcInit);
2,798✔
499
  if (pTrans->statusRpc == NULL) {
2,798!
500
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
501
    return terrno;
×
502
  }
503

504
  dDebug("dnode rpc status client is initialized");
2,798✔
505
  return 0;
2,798✔
506
}
507

508
int32_t dmInitSyncClient(SDnode *pDnode) {
2,798✔
509
  SDnodeTrans *pTrans = &pDnode->trans;
2,798✔
510

511
  SRpcInit rpcInit = {0};
2,798✔
512
  rpcInit.label = "DNODE-SYNC-CLI";
2,798✔
513
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,798✔
514
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,798✔
515
  rpcInit.sessions = 1024;
2,798✔
516
  rpcInit.connType = TAOS_CONN_CLIENT;
2,798✔
517
  rpcInit.user = TSDB_DEFAULT_USER;
2,798✔
518
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,798✔
519
  rpcInit.parent = pDnode;
2,798✔
520
  rpcInit.rfp = rpcRfp;
2,798✔
521
  rpcInit.compressSize = tsCompressMsgSize;
2,798✔
522

523
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,798✔
524
  rpcInit.retryStepFactor = tsRedirectFactor;
2,798✔
525
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,798✔
526
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,798✔
527

528
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,798✔
529
  rpcInit.failFastThreshold = 3;    // failed threshold
2,798✔
530
  rpcInit.ffp = dmFailFastFp;
2,798✔
531

532
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
2,798✔
533
  connLimitNum = TMAX(connLimitNum, 10);
2,798✔
534
  connLimitNum = TMIN(connLimitNum, 500);
2,798✔
535

536
  rpcInit.connLimitNum = connLimitNum;
2,798✔
537
  rpcInit.connLimitLock = 1;
2,798✔
538
  rpcInit.supportBatch = 1;
2,798✔
539
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
2,798✔
540
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,798✔
541
  rpcInit.startReadTimer = 1;
2,798✔
542
  rpcInit.readTimeout = tsReadTimeout;
2,798✔
543
  rpcInit.ipv6 = tsEnableIpv6;
2,798✔
544

545
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,798!
546
    dError("failed to convert version string:%s to int", td_version);
×
547
  }
548

549
  pTrans->syncRpc = rpcOpen(&rpcInit);
2,798✔
550
  if (pTrans->syncRpc == NULL) {
2,798!
551
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
552
    return terrno;
×
553
  }
554

555
  dDebug("dnode rpc sync client is initialized");
2,798✔
556
  return 0;
2,798✔
557
}
558

559
void dmCleanupClient(SDnode *pDnode) {
2,798✔
560
  SDnodeTrans *pTrans = &pDnode->trans;
2,798✔
561
  if (pTrans->clientRpc) {
2,798!
562
    rpcClose(pTrans->clientRpc);
2,798✔
563
    pTrans->clientRpc = NULL;
2,798✔
564
    dDebug("dnode rpc client is closed");
2,798✔
565
  }
566
}
2,798✔
567
void dmCleanupStatusClient(SDnode *pDnode) {
2,798✔
568
  SDnodeTrans *pTrans = &pDnode->trans;
2,798✔
569
  if (pTrans->statusRpc) {
2,798!
570
    rpcClose(pTrans->statusRpc);
2,798✔
571
    pTrans->statusRpc = NULL;
2,798✔
572
    dDebug("dnode rpc status client is closed");
2,798✔
573
  }
574
}
2,798✔
575
void dmCleanupSyncClient(SDnode *pDnode) {
2,798✔
576
  SDnodeTrans *pTrans = &pDnode->trans;
2,798✔
577
  if (pTrans->syncRpc) {
2,798!
578
    rpcClose(pTrans->syncRpc);
2,798✔
579
    pTrans->syncRpc = NULL;
2,798✔
580
    dDebug("dnode rpc sync client is closed");
2,798✔
581
  }
582
}
2,798✔
583

584
int32_t dmInitServer(SDnode *pDnode) {
2,800✔
585
  int32_t      code = 0;
2,800✔
586
  SDnodeTrans *pTrans = &pDnode->trans;
2,800✔
587

588
  SRpcInit rpcInit = {0};
2,800✔
589
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
2,800✔
590

591
  rpcInit.localPort = tsServerPort;
2,800✔
592
  rpcInit.label = "DND-S";
2,800✔
593
  rpcInit.numOfThreads = tsNumOfRpcThreads;
2,800✔
594
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,800✔
595
  rpcInit.sessions = tsMaxShellConns;
2,800✔
596
  rpcInit.connType = TAOS_CONN_SERVER;
2,800✔
597
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,800✔
598
  rpcInit.parent = pDnode;
2,800✔
599
  rpcInit.compressSize = tsCompressMsgSize;
2,800✔
600
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
2,800✔
601
  rpcInit.ipv6 = tsEnableIpv6;
2,800✔
602

603
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,800!
604
    dError("failed to convert version string:%s to int", td_version);
×
605
  }
606

607
  pTrans->serverRpc = rpcOpen(&rpcInit);
2,800✔
608
  if (pTrans->serverRpc == NULL) {
2,800✔
609
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
2!
610
    return terrno;
2✔
611
  }
612

613
  dDebug("dnode rpc server is initialized");
2,798✔
614
  return 0;
2,798✔
615
}
616

617
void dmCleanupServer(SDnode *pDnode) {
2,798✔
618
  SDnodeTrans *pTrans = &pDnode->trans;
2,798✔
619
  if (pTrans->serverRpc) {
2,798!
620
    rpcClose(pTrans->serverRpc);
2,798✔
621
    pTrans->serverRpc = NULL;
2,798✔
622
    dDebug("dnode rpc server is closed");
2,798✔
623
  }
624
}
2,798✔
625

626
SMsgCb dmGetMsgcb(SDnode *pDnode) {
23,256✔
627
  SMsgCb msgCb = {
23,256✔
628
      .clientRpc = pDnode->trans.clientRpc,
23,256✔
629
      .serverRpc = pDnode->trans.serverRpc,
23,256✔
630
      .statusRpc = pDnode->trans.statusRpc,
23,256✔
631
      .syncRpc = pDnode->trans.syncRpc,
23,256✔
632
      .sendReqFp = dmSendReq,
633
      .sendSyncReqFp = dmSendSyncReq,
634
      .sendRspFp = dmSendRsp,
635
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
636
      .releaseHandleFp = dmReleaseHandle,
637
      .reportStartupFp = dmReportStartup,
638
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
639
      .getDnodeEpFp = dmGetDnodeEp,
640
      .data = &pDnode->data,
23,256✔
641
  };
642
  return msgCb;
23,256✔
643
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc