• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4506

15 Jul 2025 12:33AM UTC coverage: 62.026% (-0.7%) from 62.706%
#4506

push

travis-ci

web-flow
docs: update stream docs (#31874)

155391 of 320094 branches covered (48.55%)

Branch coverage included in aggregate %.

240721 of 318525 relevant lines covered (75.57%)

6529048.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.18
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
5,974✔
23
  if (rpcSendResponse(pMsg) != 0) {
5,974!
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
5,974✔
27

28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
378✔
29
  SEpSet epSet = {0};
378✔
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
378✔
31

32
  if (epSet.numOfEps <= 1) {
384✔
33
    if (epSet.numOfEps == 0) {
190!
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
190!
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
384✔
48
  pMsg->pCont = rpcMallocCont(contLen);
376✔
49
  if (pMsg->pCont == NULL) {
381!
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
381✔
53
    if (contLen < 0) {
382!
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
57
    pMsg->contLen = contLen;
382✔
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
11,718,710✔
62
  const STraceId *trace = &pMsg->info.traceId;
11,718,710✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
11,718,710✔
65
  if (msgFp == NULL) {
11,718,710!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
11,718,710!
72
  pMsg->info.wrapper = pWrapper;
11,718,710✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
11,718,710✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
80,717✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
80,717✔
83
    return code;
10,157✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
70,560✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
20,368✔
87
    code = TSDB_CODE_VND_STOPPED;
9,400✔
88
  }
89
  return code;
70,560✔
90
}
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
6✔
92
  int32_t        code = 0;
6✔
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
6✔
94
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
6✔
95
  if (code < 0) {
6!
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
6✔
100
  pData->ipWhiteVer = ipWhite.ver;
6✔
101

102
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
6✔
103

104
  rpcFreeCont(pRpc->pCont);
6✔
105
}
106

107
static void dmUpdateRpcIpWhiteUnused(SDnodeData *pDnode, void *pTrans, SRpcMsg *pRpc) {
×
108
  int32_t code = TSDB_CODE_INVALID_MSG;
×
109
  dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
110
  rpcFreeCont(pRpc->pCont);
×
111
  pRpc->pCont = NULL;
×
112
  return;
×
113
}
114
static bool dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
12,932,401✔
115
  if (forbidden) {
12,932,401!
116
    dError("User:%s host:%s not in ip white list", user, IP_ADDR_STR(clientIp));
×
117
    return true;
×
118
  } else {
119
    return false;
12,932,401✔
120
  }
121
}
122

123
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
2✔
124
  SRetrieveAnalyticAlgoRsp rsp = {0};
2✔
125
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
2!
126
    taosAnalyUpdate(rsp.ver, rsp.hash);
2✔
127
    rsp.hash = NULL;
2✔
128
  }
129
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
2✔
130
  rpcFreeCont(pRpc->pCont);
2✔
131
}
2✔
132

133
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
12,933,776✔
134
  SDnodeTrans  *pTrans = &pDnode->trans;
12,933,776✔
135
  int32_t       code = -1;
12,933,776✔
136
  SRpcMsg      *pMsg = NULL;
12,933,776✔
137
  SMgmtWrapper *pWrapper = NULL;
12,933,776✔
138
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
12,933,776✔
139

140
  const STraceId *trace = &pRpc->info.traceId;
12,933,776✔
141
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
12,933,776!
142
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
143

144
  int32_t svrVer = 0;
12,933,776✔
145
  code = taosVersionStrToInt(td_version, &svrVer);
12,933,776✔
146
  if (code != 0) {
12,934,719!
147
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
148
    goto _OVER;
×
149
  }
150
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
12,934,719!
151
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
152
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
153
    goto _OVER;
×
154
  }
155

156
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, &pRpc->info.conn.cliAddr);
12,938,057✔
157
  if (isForbidden) {
12,935,303!
158
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
159
    goto _OVER;
×
160
  }
161

162
  switch (pRpc->msgType) {
12,935,303!
163
    case TDMT_DND_NET_TEST:
×
164
      dmProcessNetTestReq(pDnode, pRpc);
×
165
      return;
1,142,600✔
166
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
1,142,004✔
167
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
168
    case TDMT_SCH_FETCH_RSP:
169
    case TDMT_SCH_MERGE_FETCH_RSP:
170
    case TDMT_VND_SUBMIT_RSP:
171
    case TDMT_MND_GET_DB_INFO_RSP:
172
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
1,142,004✔
173
      return;
1,142,592✔
174
    case TDMT_MND_STATUS_RSP:
×
175
      if (pEpSet != NULL) {
×
176
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
177
      }
178
      break;
×
179
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
180
      dmUpdateRpcIpWhiteUnused(&pDnode->data, pTrans->serverRpc, pRpc);
×
181
      return;
×
182
    case TDMT_MND_RETRIEVE_IP_WHITE_DUAL_RSP:
6✔
183
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
6✔
184
      return;
6✔
185
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
2✔
186
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
2✔
187
      return;
2✔
188
    default:
11,793,291✔
189
      break;
11,793,291✔
190
  }
191

192
  /*
193
  pDnode is null, TD-22618
194
  at trans.c line 91
195
  before this line, dmProcessRpcMsg callback is set
196
  after this line, parent is set
197
  so when dmProcessRpcMsg is called, pDonde is still null.
198
  */
199
  if (pDnode != NULL) {
11,793,291✔
200
    if (pDnode->status != DND_STAT_RUNNING) {
11,789,574✔
201
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
74,316!
202
        dmProcessServerStartupStatus(pDnode, pRpc);
×
203
        return;
×
204
      } else {
205
        if (pDnode->status == DND_STAT_INIT) {
74,316✔
206
          code = TSDB_CODE_APP_IS_STARTING;
3,754✔
207
        } else {
208
          code = TSDB_CODE_APP_IS_STOPPING;
70,562✔
209
        }
210
        goto _OVER;
74,316✔
211
      }
212
    }
213
  } else {
214
    code = TSDB_CODE_APP_IS_STARTING;
3,717✔
215
    goto _OVER;
3,717✔
216
  }
217

218
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
11,715,258!
219
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
220
    code = TSDB_CODE_INVALID_MSG_LEN;
×
221
    goto _OVER;
×
222
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
11,715,258✔
223
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
14,560!
224
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
16!
225
  }
226

227
  if (pHandle->defaultNtype == NODE_END) {
11,715,258!
228
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
229
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
230
    goto _OVER;
×
231
  }
232

233
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
11,715,258✔
234
  if (pHandle->needCheckVgId) {
11,715,258✔
235
    if (pRpc->contLen > 0) {
8,317,107!
236
      const SMsgHead *pHead = pRpc->pCont;
8,321,245✔
237
      const int32_t   vgId = ntohl(pHead->vgId);
8,321,245✔
238
      switch (vgId) {
8,321,245✔
239
        case QNODE_HANDLE:
1,056,227✔
240
          pWrapper = &pDnode->wrappers[QNODE];
1,056,227✔
241
          break;
1,056,227✔
242
        case SNODE_HANDLE:
1,159✔
243
          pWrapper = &pDnode->wrappers[SNODE];
1,159✔
244
          break;
1,159✔
245
        case MNODE_HANDLE:
136,460✔
246
          pWrapper = &pDnode->wrappers[MNODE];
136,460✔
247
          break;
136,460✔
248
        default:
7,127,399✔
249
          break;
7,127,399✔
250
      }
251
    } else {
252
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
253
      code = TSDB_CODE_INVALID_MSG_LEN;
×
254
      goto _OVER;
×
255
    }
256
  }
257

258
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
11,719,396✔
259
    pWrapper = NULL;
415✔
260
    goto _OVER;
415✔
261
  }
262

263
  pRpc->info.wrapper = pWrapper;
11,722,292✔
264

265
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
11,722,292✔
266
  if (pRpc->msgType == TDMT_SYNC_HEARTBEAT || pRpc->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
11,722,292✔
267
    itype = DEF_QITEM;
99,607✔
268
  } else if (IsReq(pRpc)) {
11,622,685✔
269
    itype = APPLY_QITEM;
11,487,319✔
270
  }
271
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
11,722,292✔
272
  if (code) goto _OVER;
11,725,243!
273

274
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
11,725,243✔
275
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
11,725,243!
276
          pRpc->contLen);
277

278
  code = dmProcessNodeMsg(pWrapper, pMsg);
11,725,250✔
279

280
_OVER:
11,800,080✔
281
  if (code != 0) {
11,800,080✔
282
    code = dmConvertErrCode(pRpc->msgType, code);
80,716✔
283
    if (pMsg) {
80,716✔
284
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
5,977!
285
    } else {
286
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
74,739!
287
    }
288

289
    if (IsReq(pRpc)) {
80,716✔
290
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
80,578✔
291
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
80,578✔
292
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
381✔
293
      }
294

295
      if (pWrapper != NULL) {
80,576✔
296
        dmSendRsp(&rsp);
5,974✔
297
      } else {
298
        if (rpcSendResponse(&rsp) != 0) {
74,602!
299
          dError("failed to send response, msg:%p", &rsp);
×
300
        }
301
      }
302
    }
303

304
    if (pMsg != NULL) {
80,719✔
305
      dGTrace("msg:%p, is freed", pMsg);
5,977!
306
      taosFreeQitem(pMsg);
5,977✔
307
    }
308
    rpcFreeCont(pRpc->pCont);
80,719✔
309
    pRpc->pCont = NULL;
80,718✔
310
  }
311

312
  dmReleaseWrapper(pWrapper);
11,800,082✔
313
}
314

315
int32_t dmInitMsgHandle(SDnode *pDnode) {
3,073✔
316
  SDnodeTrans *pTrans = &pDnode->trans;
3,073✔
317

318
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
21,511✔
319
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
18,438✔
320
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
18,438✔
321
    if (pArray == NULL) return -1;
18,438!
322

323
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
1,161,594✔
324
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
1,143,156✔
325
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
1,143,156✔
326
      if (pMgmt->needCheckVgId) {
1,143,156✔
327
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
184,380✔
328
      }
329
      if (!pMgmt->needCheckVgId) {
1,143,156✔
330
        pHandle->defaultNtype = ntype;
958,776✔
331
      }
332
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
1,143,156✔
333
    }
334

335
    taosArrayDestroy(pArray);
18,438✔
336
  }
337

338
  return 0;
3,073✔
339
}
340

341
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
365,942✔
342
  int32_t code = 0;
365,942✔
343
  SDnode *pDnode = dmInstance();
365,942✔
344
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
365,942✔
345
    rpcFreeCont(pMsg->pCont);
611✔
346
    pMsg->pCont = NULL;
611✔
347
    if (pDnode->status == DND_STAT_INIT) {
611!
348
      code = TSDB_CODE_APP_IS_STARTING;
×
349
    } else {
350
      code = TSDB_CODE_APP_IS_STOPPING;
611✔
351
    }
352
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
611!
353
           pMsg->info.handle);
354
    return code;
611✔
355
  } else {
356
    pMsg->info.handle = 0;
365,331✔
357
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
365,331!
358
      dError("failed to send rpc msg");
×
359
    }
360
    return 0;
365,330✔
361
  }
362
}
363
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
1,074,028✔
364
  int32_t code = 0;
1,074,028✔
365
  SDnode *pDnode = dmInstance();
1,074,028✔
366
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
1,074,033!
367
    rpcFreeCont(pMsg->pCont);
×
368
    pMsg->pCont = NULL;
×
369
    if (pDnode->status == DND_STAT_INIT) {
×
370
      code = TSDB_CODE_APP_IS_STARTING;
×
371
    } else {
372
      code = TSDB_CODE_APP_IS_STOPPING;
×
373
    }
374
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
375
           pMsg->info.handle);
376
    return code;
×
377
  } else {
378
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
1,074,033✔
379
  }
380
}
381

382
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
3,096,858✔
383

384
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
3,091,866✔
385
  (void)rpcReleaseHandle(pHandle, type, status);
3,091,866✔
386
}
3,093,498✔
387

388
static bool rpcRfp(int32_t code, tmsg_t msgType) {
66,298✔
389
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
66,298!
390
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
54,129✔
391
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
13,730✔
392
      code == TSDB_CODE_APP_IS_STOPPING) {
393
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
60,000!
394
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
60,000!
395
      return false;
×
396
    }
397
    return true;
60,000✔
398
  } else {
399
    return false;
6,298✔
400
  }
401
}
402
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
403
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_SSMIGRATE ||
×
404
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE ||
×
405
      msgType == TDMT_VND_FOLLOWER_SSMIGRATE) {
406
    return true;
×
407
  }
408
  return false;
×
409
}
410
int32_t dmInitClient(SDnode *pDnode) {
3,068✔
411
  SDnodeTrans *pTrans = &pDnode->trans;
3,068✔
412

413
  SRpcInit rpcInit = {0};
3,068✔
414
  rpcInit.label = "DNODE-CLI";
3,068✔
415
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
3,068✔
416
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
3,068✔
417
  rpcInit.sessions = 1024;
3,068✔
418
  rpcInit.connType = TAOS_CONN_CLIENT;
3,068✔
419
  rpcInit.user = TSDB_DEFAULT_USER;
3,068✔
420
  rpcInit.idleTime = tsShellActivityTimer * 1000;
3,068✔
421
  rpcInit.parent = pDnode;
3,068✔
422
  rpcInit.rfp = rpcRfp;
3,068✔
423
  rpcInit.compressSize = tsCompressMsgSize;
3,068✔
424
  rpcInit.dfp = destroyAhandle;
3,068✔
425

426
  rpcInit.retryMinInterval = tsRedirectPeriod;
3,068✔
427
  rpcInit.retryStepFactor = tsRedirectFactor;
3,068✔
428
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
3,068✔
429
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
3,068✔
430

431
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
3,068✔
432
  rpcInit.failFastThreshold = 3;    // failed threshold
3,068✔
433
  rpcInit.ffp = dmFailFastFp;
3,068✔
434

435
  rpcInit.noDelayFp = rpcNoDelayMsg;
3,068✔
436

437
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
3,068✔
438
  connLimitNum = TMAX(connLimitNum, 10);
3,068✔
439
  connLimitNum = TMIN(connLimitNum, 500);
3,068✔
440

441
  rpcInit.connLimitNum = connLimitNum;
3,068✔
442
  rpcInit.connLimitLock = 1;
3,068✔
443
  rpcInit.supportBatch = 1;
3,068✔
444
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
3,068✔
445
  rpcInit.shareConn = 1;
3,068✔
446
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
3,068✔
447
  rpcInit.notWaitAvaliableConn = 0;
3,068✔
448
  rpcInit.startReadTimer = 1;
3,068✔
449
  rpcInit.readTimeout = tsReadTimeout;
3,068✔
450
  rpcInit.ipv6 = tsEnableIpv6;
3,068✔
451

452
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
3,068!
453
    dError("failed to convert version string:%s to int", td_version);
×
454
  }
455

456
  pTrans->clientRpc = rpcOpen(&rpcInit);
3,068✔
457
  if (pTrans->clientRpc == NULL) {
3,068!
458
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
459
    return terrno;
×
460
  }
461

462
  dDebug("dnode rpc client is initialized");
3,068✔
463
  return 0;
3,068✔
464
}
465
int32_t dmInitStatusClient(SDnode *pDnode) {
3,068✔
466
  SDnodeTrans *pTrans = &pDnode->trans;
3,068✔
467

468
  SRpcInit rpcInit = {0};
3,068✔
469
  rpcInit.label = "DNODE-STA-CLI";
3,068✔
470
  rpcInit.numOfThreads = 1;
3,068✔
471
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
3,068✔
472
  rpcInit.sessions = 1024;
3,068✔
473
  rpcInit.connType = TAOS_CONN_CLIENT;
3,068✔
474
  rpcInit.user = TSDB_DEFAULT_USER;
3,068✔
475
  rpcInit.idleTime = tsShellActivityTimer * 1000;
3,068✔
476
  rpcInit.parent = pDnode;
3,068✔
477
  rpcInit.rfp = rpcRfp;
3,068✔
478
  rpcInit.compressSize = tsCompressMsgSize;
3,068✔
479

480
  rpcInit.retryMinInterval = tsRedirectPeriod;
3,068✔
481
  rpcInit.retryStepFactor = tsRedirectFactor;
3,068✔
482
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
3,068✔
483
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
3,068✔
484

485
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
3,068✔
486
  rpcInit.failFastThreshold = 3;    // failed threshold
3,068✔
487
  rpcInit.ffp = dmFailFastFp;
3,068✔
488

489
  int32_t connLimitNum = 100;
3,068✔
490
  connLimitNum = TMAX(connLimitNum, 10);
3,068✔
491
  connLimitNum = TMIN(connLimitNum, 500);
3,068✔
492

493
  rpcInit.connLimitNum = connLimitNum;
3,068✔
494
  rpcInit.connLimitLock = 1;
3,068✔
495
  rpcInit.supportBatch = 1;
3,068✔
496
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
3,068✔
497
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
3,068✔
498
  rpcInit.startReadTimer = 0;
3,068✔
499
  rpcInit.readTimeout = 0;
3,068✔
500
  rpcInit.ipv6 = tsEnableIpv6;
3,068✔
501

502
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
3,068!
503
    dError("failed to convert version string:%s to int", td_version);
×
504
  }
505

506
  pTrans->statusRpc = rpcOpen(&rpcInit);
3,068✔
507
  if (pTrans->statusRpc == NULL) {
3,068!
508
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
509
    return terrno;
×
510
  }
511

512
  dDebug("dnode rpc status client is initialized");
3,068✔
513
  return 0;
3,068✔
514
}
515

516
int32_t dmInitSyncClient(SDnode *pDnode) {
3,068✔
517
  SDnodeTrans *pTrans = &pDnode->trans;
3,068✔
518

519
  SRpcInit rpcInit = {0};
3,068✔
520
  rpcInit.label = "DNODE-SYNC-CLI";
3,068✔
521
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
3,068✔
522
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
3,068✔
523
  rpcInit.sessions = 1024;
3,068✔
524
  rpcInit.connType = TAOS_CONN_CLIENT;
3,068✔
525
  rpcInit.user = TSDB_DEFAULT_USER;
3,068✔
526
  rpcInit.idleTime = tsShellActivityTimer * 1000;
3,068✔
527
  rpcInit.parent = pDnode;
3,068✔
528
  rpcInit.rfp = rpcRfp;
3,068✔
529
  rpcInit.compressSize = tsCompressMsgSize;
3,068✔
530

531
  rpcInit.retryMinInterval = tsRedirectPeriod;
3,068✔
532
  rpcInit.retryStepFactor = tsRedirectFactor;
3,068✔
533
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
3,068✔
534
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
3,068✔
535

536
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
3,068✔
537
  rpcInit.failFastThreshold = 3;    // failed threshold
3,068✔
538
  rpcInit.ffp = dmFailFastFp;
3,068✔
539

540
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
3,068✔
541
  connLimitNum = TMAX(connLimitNum, 10);
3,068✔
542
  connLimitNum = TMIN(connLimitNum, 500);
3,068✔
543

544
  rpcInit.connLimitNum = connLimitNum;
3,068✔
545
  rpcInit.connLimitLock = 1;
3,068✔
546
  rpcInit.supportBatch = 1;
3,068✔
547
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
3,068✔
548
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
3,068✔
549
  rpcInit.startReadTimer = 1;
3,068✔
550
  rpcInit.readTimeout = tsReadTimeout;
3,068✔
551
  rpcInit.ipv6 = tsEnableIpv6;
3,068✔
552

553
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
3,068!
554
    dError("failed to convert version string:%s to int", td_version);
×
555
  }
556

557
  pTrans->syncRpc = rpcOpen(&rpcInit);
3,068✔
558
  if (pTrans->syncRpc == NULL) {
3,068!
559
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
560
    return terrno;
×
561
  }
562

563
  dDebug("dnode rpc sync client is initialized");
3,068✔
564
  return 0;
3,068✔
565
}
566

567
void dmCleanupClient(SDnode *pDnode) {
3,068✔
568
  SDnodeTrans *pTrans = &pDnode->trans;
3,068✔
569
  if (pTrans->clientRpc) {
3,068!
570
    rpcClose(pTrans->clientRpc);
3,068✔
571
    pTrans->clientRpc = NULL;
3,068✔
572
    dDebug("dnode rpc client is closed");
3,068✔
573
  }
574
}
3,068✔
575
void dmCleanupStatusClient(SDnode *pDnode) {
3,068✔
576
  SDnodeTrans *pTrans = &pDnode->trans;
3,068✔
577
  if (pTrans->statusRpc) {
3,068!
578
    rpcClose(pTrans->statusRpc);
3,068✔
579
    pTrans->statusRpc = NULL;
3,068✔
580
    dDebug("dnode rpc status client is closed");
3,068✔
581
  }
582
}
3,068✔
583
void dmCleanupSyncClient(SDnode *pDnode) {
3,068✔
584
  SDnodeTrans *pTrans = &pDnode->trans;
3,068✔
585
  if (pTrans->syncRpc) {
3,068!
586
    rpcClose(pTrans->syncRpc);
3,068✔
587
    pTrans->syncRpc = NULL;
3,068✔
588
    dDebug("dnode rpc sync client is closed");
3,068✔
589
  }
590
}
3,068✔
591

592
int32_t dmInitServer(SDnode *pDnode) {
3,073✔
593
  int32_t      code = 0;
3,073✔
594
  SDnodeTrans *pTrans = &pDnode->trans;
3,073✔
595

596
  SRpcInit rpcInit = {0};
3,073✔
597
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
3,073✔
598

599
  rpcInit.localPort = tsServerPort;
3,073✔
600
  rpcInit.label = "DND-S";
3,073✔
601
  rpcInit.numOfThreads = tsNumOfRpcThreads;
3,073✔
602
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
3,073✔
603
  rpcInit.sessions = tsMaxShellConns;
3,073✔
604
  rpcInit.connType = TAOS_CONN_SERVER;
3,073✔
605
  rpcInit.idleTime = tsShellActivityTimer * 1000;
3,073✔
606
  rpcInit.parent = pDnode;
3,073✔
607
  rpcInit.compressSize = tsCompressMsgSize;
3,073✔
608
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
3,073✔
609
  rpcInit.ipv6 = tsEnableIpv6;
3,073✔
610

611
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
3,073!
612
    dError("failed to convert version string:%s to int", td_version);
×
613
  }
614

615
  pTrans->serverRpc = rpcOpen(&rpcInit);
3,073✔
616
  if (pTrans->serverRpc == NULL) {
3,073✔
617
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
5!
618
    return terrno;
5✔
619
  }
620

621
  dDebug("dnode rpc server is initialized");
3,068✔
622
  return 0;
3,068✔
623
}
624

625
void dmCleanupServer(SDnode *pDnode) {
3,068✔
626
  SDnodeTrans *pTrans = &pDnode->trans;
3,068✔
627
  if (pTrans->serverRpc) {
3,068!
628
    rpcClose(pTrans->serverRpc);
3,068✔
629
    pTrans->serverRpc = NULL;
3,068✔
630
    dDebug("dnode rpc server is closed");
3,068✔
631
  }
632
}
3,068✔
633

634
SMsgCb dmGetMsgcb(SDnode *pDnode) {
28,605✔
635
  SMsgCb msgCb = {
28,605✔
636
      .clientRpc = pDnode->trans.clientRpc,
28,605✔
637
      .serverRpc = pDnode->trans.serverRpc,
28,605✔
638
      .statusRpc = pDnode->trans.statusRpc,
28,605✔
639
      .syncRpc = pDnode->trans.syncRpc,
28,605✔
640
      .sendReqFp = dmSendReq,
641
      .sendSyncReqFp = dmSendSyncReq,
642
      .sendRspFp = dmSendRsp,
643
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
644
      .releaseHandleFp = dmReleaseHandle,
645
      .reportStartupFp = dmReportStartup,
646
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
647
      .getDnodeEpFp = dmGetDnodeEp,
648
      .data = &pDnode->data,
28,605✔
649
  };
650
  return msgCb;
28,605✔
651
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc