• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4548

22 Jul 2025 02:37AM UTC coverage: 54.273% (-3.0%) from 57.287%
#4548

push

travis-ci

GitHub
Merge pull request #32061 from taosdata/new_testcases

132738 of 315239 branches covered (42.11%)

Branch coverage included in aggregate %.

201371 of 300373 relevant lines covered (67.04%)

3475977.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.38
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
800✔
23
  if (rpcSendResponse(pMsg) != 0) {
800!
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
800✔
27

28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
129✔
29
  SEpSet epSet = {0};
129✔
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
129✔
31

32
  if (epSet.numOfEps <= 1) {
129✔
33
    if (epSet.numOfEps == 0) {
63!
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
63!
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
129✔
48
  pMsg->pCont = rpcMallocCont(contLen);
126✔
49
  if (pMsg->pCont == NULL) {
128!
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
128✔
53
    if (contLen < 0) {
129!
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
57
    pMsg->contLen = contLen;
129✔
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
8,010,172✔
62
  const STraceId *trace = &pMsg->info.traceId;
8,010,172✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
8,010,172✔
65
  if (msgFp == NULL) {
8,010,172!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
8,010,172!
72
  pMsg->info.wrapper = pWrapper;
8,010,170✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
8,010,170✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
29,284✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
29,284✔
83
    return code;
4,923✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
24,361✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
11,259✔
87
    code = TSDB_CODE_VND_STOPPED;
7,401✔
88
  }
89
  return code;
24,361✔
90
}
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
6✔
92
  int32_t        code = 0;
6✔
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
6✔
94
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
6✔
95
  if (code < 0) {
6!
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
6✔
100
  pData->ipWhiteVer = ipWhite.ver;
6✔
101

102
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
6✔
103

104
  rpcFreeCont(pRpc->pCont);
6✔
105
}
106

107
static void dmUpdateRpcIpWhiteUnused(SDnodeData *pDnode, void *pTrans, SRpcMsg *pRpc) {
×
108
  int32_t code = TSDB_CODE_INVALID_MSG;
×
109
  dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
110
  rpcFreeCont(pRpc->pCont);
×
111
  pRpc->pCont = NULL;
×
112
  return;
×
113
}
114
static bool dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
8,903,133✔
115
  if (forbidden) {
8,903,133!
116
    dError("User:%s host:%s not in ip white list", user, IP_ADDR_STR(clientIp));
×
117
    return true;
×
118
  } else {
119
    return false;
8,903,133✔
120
  }
121
}
122

123
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
2✔
124
  SRetrieveAnalyticAlgoRsp rsp = {0};
2✔
125
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
2!
126
    taosAnalyUpdate(rsp.ver, rsp.hash);
2✔
127
    rsp.hash = NULL;
2✔
128
  }
129
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
2✔
130
  rpcFreeCont(pRpc->pCont);
2✔
131
}
2✔
132

133
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
8,903,320✔
134
  SDnodeTrans  *pTrans = &pDnode->trans;
8,903,320✔
135
  int32_t       code = -1;
8,903,320✔
136
  SRpcMsg      *pMsg = NULL;
8,903,320✔
137
  SMgmtWrapper *pWrapper = NULL;
8,903,320✔
138
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
8,903,320✔
139

140
  const STraceId *trace = &pRpc->info.traceId;
8,903,320✔
141
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
8,903,320!
142
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
143

144
  int32_t svrVer = 0;
8,903,320✔
145
  code = taosVersionStrToInt(td_version, &svrVer);
8,903,320✔
146
  if (code != 0) {
8,903,486!
147
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
148
    goto _OVER;
×
149
  }
150
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
8,903,486!
151
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
152
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
153
    goto _OVER;
×
154
  }
155

156
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, &pRpc->info.conn.cliAddr);
8,903,563✔
157
  if (isForbidden) {
8,903,423!
158
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
159
    goto _OVER;
×
160
  }
161

162
  switch (pRpc->msgType) {
8,903,423!
163
    case TDMT_DND_NET_TEST:
×
164
      dmProcessNetTestReq(pDnode, pRpc);
×
165
      return;
865,194✔
166
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
865,014✔
167
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
168
    case TDMT_SCH_FETCH_RSP:
169
    case TDMT_SCH_MERGE_FETCH_RSP:
170
    case TDMT_VND_SUBMIT_RSP:
171
    case TDMT_MND_GET_DB_INFO_RSP:
172
    case TDMT_STREAM_FETCH_RSP:
173
    case TDMT_STREAM_FETCH_FROM_RUNNER_RSP:
174
    case TDMT_STREAM_FETCH_FROM_CACHE_RSP:
175
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
865,014✔
176
      return;
865,186✔
177
    case TDMT_MND_STATUS_RSP:
×
178
      if (pEpSet != NULL) {
×
179
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
180
      }
181
      break;
×
182
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
183
      dmUpdateRpcIpWhiteUnused(&pDnode->data, pTrans->serverRpc, pRpc);
×
184
      return;
×
185
    case TDMT_MND_RETRIEVE_IP_WHITE_DUAL_RSP:
6✔
186
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
6✔
187
      return;
6✔
188
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
2✔
189
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
2✔
190
      return;
2✔
191
    default:
8,038,401✔
192
      break;
8,038,401✔
193
  }
194

195
  /*
196
  pDnode is null, TD-22618
197
  at trans.c line 91
198
  before this line, dmProcessRpcMsg callback is set
199
  after this line, parent is set
200
  so when dmProcessRpcMsg is called, pDonde is still null.
201
  */
202
  if (pDnode != NULL) {
8,038,401✔
203
    if (pDnode->status != DND_STAT_RUNNING) {
8,038,077✔
204
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
28,356!
205
        dmProcessServerStartupStatus(pDnode, pRpc);
×
206
        return;
×
207
      } else {
208
        if (pDnode->status == DND_STAT_INIT) {
28,356✔
209
          code = TSDB_CODE_APP_IS_STARTING;
3,992✔
210
        } else {
211
          code = TSDB_CODE_APP_IS_STOPPING;
24,364✔
212
        }
213
        goto _OVER;
28,356✔
214
      }
215
    }
216
  } else {
217
    code = TSDB_CODE_APP_IS_STARTING;
324✔
218
    goto _OVER;
324✔
219
  }
220

221
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
8,009,721!
222
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
223
    code = TSDB_CODE_INVALID_MSG_LEN;
×
224
    goto _OVER;
×
225
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
8,009,721✔
226
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
1,153!
227
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
271!
228
  }
229

230
  if (pHandle->defaultNtype == NODE_END) {
8,009,721!
231
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
232
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
233
    goto _OVER;
×
234
  }
235

236
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
8,009,721✔
237
  if (pHandle->needCheckVgId) {
8,009,721✔
238
    if (pRpc->contLen > 0) {
5,723,379!
239
      const SMsgHead *pHead = pRpc->pCont;
5,723,704✔
240
      const int32_t   vgId = ntohl(pHead->vgId);
5,723,704✔
241
      switch (vgId) {
5,723,704!
242
        case QNODE_HANDLE:
920,976✔
243
          pWrapper = &pDnode->wrappers[QNODE];
920,976✔
244
          break;
920,976✔
245
        case SNODE_HANDLE:
×
246
          pWrapper = &pDnode->wrappers[SNODE];
×
247
          break;
×
248
        case MNODE_HANDLE:
61,013✔
249
          pWrapper = &pDnode->wrappers[MNODE];
61,013✔
250
          break;
61,013✔
251
        default:
4,741,715✔
252
          break;
4,741,715✔
253
      }
254
    } else {
255
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
256
      code = TSDB_CODE_INVALID_MSG_LEN;
×
257
      goto _OVER;
×
258
    }
259
  }
260

261
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
8,010,046✔
262
    pWrapper = NULL;
130✔
263
    goto _OVER;
130✔
264
  }
265

266
  pRpc->info.wrapper = pWrapper;
8,010,609✔
267

268
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
8,010,609✔
269
  if (IsReq(pRpc)) {
8,010,609✔
270
    if (pRpc->msgType == TDMT_SYNC_HEARTBEAT || pRpc->msgType == TDMT_SYNC_HEARTBEAT_REPLY)
7,945,712✔
271
      itype = DEF_QITEM;
42,667✔
272
    else
273
      itype = RPC_QITEM;
7,903,045✔
274
  } else {
275
    itype = DEF_QITEM;
64,897✔
276
  }
277
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
8,010,609✔
278
  if (code) goto _OVER;
8,011,088!
279

280
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
8,011,088✔
281
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
8,011,088!
282
          pRpc->contLen);
283

284
  code = dmProcessNodeMsg(pWrapper, pMsg);
8,011,088✔
285

286
_OVER:
8,039,131✔
287
  if (code != 0) {
8,039,131✔
288
    code = dmConvertErrCode(pRpc->msgType, code);
29,285✔
289
    if (pMsg) {
29,286✔
290
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
800!
291
    } else {
292
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
28,486!
293
    }
294

295
    if (IsReq(pRpc)) {
29,286✔
296
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
29,135✔
297
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
29,135✔
298
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
131✔
299
      }
300

301
      if (pWrapper != NULL) {
29,137✔
302
        dmSendRsp(&rsp);
800✔
303
      } else {
304
        if (rpcSendResponse(&rsp) != 0) {
28,337!
305
          dError("failed to send response, msg:%p", &rsp);
×
306
        }
307
      }
308
    }
309

310
    if (pMsg != NULL) {
29,288✔
311
      dGTrace("msg:%p, is freed", pMsg);
800!
312
      taosFreeQitem(pMsg);
800✔
313
    }
314
    rpcFreeCont(pRpc->pCont);
29,288✔
315
    pRpc->pCont = NULL;
29,286✔
316
  }
317

318
  dmReleaseWrapper(pWrapper);
8,039,132✔
319
}
320

321
int32_t dmInitMsgHandle(SDnode *pDnode) {
2,310✔
322
  SDnodeTrans *pTrans = &pDnode->trans;
2,310✔
323

324
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
16,170✔
325
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
13,860✔
326
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
13,860✔
327
    if (pArray == NULL) return -1;
13,860!
328

329
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
725,340✔
330
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
711,480✔
331
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
711,480✔
332
      if (pMgmt->needCheckVgId) {
711,480✔
333
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
83,160✔
334
      }
335
      if (!pMgmt->needCheckVgId) {
711,480✔
336
        pHandle->defaultNtype = ntype;
628,320✔
337
      }
338
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
711,480✔
339
    }
340

341
    taosArrayDestroy(pArray);
13,860✔
342
  }
343

344
  return 0;
2,310✔
345
}
346

347
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
201,918✔
348
  int32_t code = 0;
201,918✔
349
  SDnode *pDnode = dmInstance();
201,918✔
350
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
201,918!
351
    rpcFreeCont(pMsg->pCont);
1,819✔
352
    pMsg->pCont = NULL;
1,819✔
353
    if (pDnode->status == DND_STAT_INIT) {
1,819✔
354
      code = TSDB_CODE_APP_IS_STARTING;
40✔
355
    } else {
356
      code = TSDB_CODE_APP_IS_STOPPING;
1,779✔
357
    }
358
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
1,819!
359
           pMsg->info.handle);
360
    return code;
1,819✔
361
  } else {
362
    pMsg->info.handle = 0;
200,099✔
363
    code = rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
200,099✔
364
    if (code != 0) {
200,099!
365
      dError("failed to send rpc msg");
×
366
      return code;
×
367
    }
368
    return 0;
200,099✔
369
  }
370
}
371
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
514,023✔
372
  int32_t code = 0;
514,023✔
373
  SDnode *pDnode = dmInstance();
514,023✔
374
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
514,022!
375
    rpcFreeCont(pMsg->pCont);
×
376
    pMsg->pCont = NULL;
×
377
    if (pDnode->status == DND_STAT_INIT) {
×
378
      code = TSDB_CODE_APP_IS_STARTING;
×
379
    } else {
380
      code = TSDB_CODE_APP_IS_STOPPING;
×
381
    }
382
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
383
           pMsg->info.handle);
384
    return code;
×
385
  } else {
386
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
514,022✔
387
  }
388
}
389

390
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
2,269,106✔
391

392
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
2,262,643✔
393
  (void)rpcReleaseHandle(pHandle, type, status);
2,262,643✔
394
}
2,262,687✔
395

396
static bool rpcRfp(int32_t code, tmsg_t msgType) {
47,489✔
397
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
47,489!
398
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
28,728✔
399
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
6,967✔
400
      code == TSDB_CODE_APP_IS_STOPPING) {
401
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
43,219!
402
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
43,219!
403
      return false;
×
404
    }
405
    return true;
43,219✔
406
  } else {
407
    return false;
4,270✔
408
  }
409
}
410
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
411
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_SSMIGRATE ||
×
412
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE ||
×
413
      msgType == TDMT_VND_FOLLOWER_SSMIGRATE) {
414
    return true;
×
415
  }
416
  return false;
×
417
}
418
int32_t dmInitClient(SDnode *pDnode) {
2,310✔
419
  SDnodeTrans *pTrans = &pDnode->trans;
2,310✔
420

421
  SRpcInit rpcInit = {0};
2,310✔
422
  rpcInit.label = "DNODE-CLI";
2,310✔
423
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,310✔
424
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,310✔
425
  rpcInit.sessions = 1024;
2,310✔
426
  rpcInit.connType = TAOS_CONN_CLIENT;
2,310✔
427
  rpcInit.user = TSDB_DEFAULT_USER;
2,310✔
428
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,310✔
429
  rpcInit.parent = pDnode;
2,310✔
430
  rpcInit.rfp = rpcRfp;
2,310✔
431
  rpcInit.compressSize = tsCompressMsgSize;
2,310✔
432
  rpcInit.dfp = destroyAhandle;
2,310✔
433

434
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,310✔
435
  rpcInit.retryStepFactor = tsRedirectFactor;
2,310✔
436
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,310✔
437
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,310✔
438

439
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,310✔
440
  rpcInit.failFastThreshold = 3;    // failed threshold
2,310✔
441
  rpcInit.ffp = dmFailFastFp;
2,310✔
442

443
  rpcInit.noDelayFp = rpcNoDelayMsg;
2,310✔
444

445
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,310✔
446
  connLimitNum = TMAX(connLimitNum, 10);
2,310✔
447
  connLimitNum = TMIN(connLimitNum, 500);
2,310✔
448

449
  rpcInit.connLimitNum = connLimitNum;
2,310✔
450
  rpcInit.connLimitLock = 1;
2,310✔
451
  rpcInit.supportBatch = 1;
2,310✔
452
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,310✔
453
  rpcInit.shareConn = 1;
2,310✔
454
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,310✔
455
  rpcInit.notWaitAvaliableConn = 0;
2,310✔
456
  rpcInit.startReadTimer = 1;
2,310✔
457
  rpcInit.readTimeout = tsReadTimeout;
2,310✔
458
  rpcInit.ipv6 = tsEnableIpv6;
2,310✔
459

460
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,310!
461
    dError("failed to convert version string:%s to int", td_version);
×
462
  }
463

464
  pTrans->clientRpc = rpcOpen(&rpcInit);
2,310✔
465
  if (pTrans->clientRpc == NULL) {
2,310!
466
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
467
    return terrno;
×
468
  }
469

470
  dDebug("dnode rpc client is initialized");
2,310✔
471
  return 0;
2,310✔
472
}
473
int32_t dmInitStatusClient(SDnode *pDnode) {
2,310✔
474
  SDnodeTrans *pTrans = &pDnode->trans;
2,310✔
475

476
  SRpcInit rpcInit = {0};
2,310✔
477
  rpcInit.label = "DNODE-STA-CLI";
2,310✔
478
  rpcInit.numOfThreads = 1;
2,310✔
479
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,310✔
480
  rpcInit.sessions = 1024;
2,310✔
481
  rpcInit.connType = TAOS_CONN_CLIENT;
2,310✔
482
  rpcInit.user = TSDB_DEFAULT_USER;
2,310✔
483
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,310✔
484
  rpcInit.parent = pDnode;
2,310✔
485
  rpcInit.rfp = rpcRfp;
2,310✔
486
  rpcInit.compressSize = tsCompressMsgSize;
2,310✔
487

488
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,310✔
489
  rpcInit.retryStepFactor = tsRedirectFactor;
2,310✔
490
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,310✔
491
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,310✔
492

493
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,310✔
494
  rpcInit.failFastThreshold = 3;    // failed threshold
2,310✔
495
  rpcInit.ffp = dmFailFastFp;
2,310✔
496

497
  int32_t connLimitNum = 100;
2,310✔
498
  connLimitNum = TMAX(connLimitNum, 10);
2,310✔
499
  connLimitNum = TMIN(connLimitNum, 500);
2,310✔
500

501
  rpcInit.connLimitNum = connLimitNum;
2,310✔
502
  rpcInit.connLimitLock = 1;
2,310✔
503
  rpcInit.supportBatch = 1;
2,310✔
504
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,310✔
505
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,310✔
506
  rpcInit.startReadTimer = 0;
2,310✔
507
  rpcInit.readTimeout = 0;
2,310✔
508
  rpcInit.ipv6 = tsEnableIpv6;
2,310✔
509

510
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,310!
511
    dError("failed to convert version string:%s to int", td_version);
×
512
  }
513

514
  pTrans->statusRpc = rpcOpen(&rpcInit);
2,310✔
515
  if (pTrans->statusRpc == NULL) {
2,310!
516
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
517
    return terrno;
×
518
  }
519

520
  dDebug("dnode rpc status client is initialized");
2,310✔
521
  return 0;
2,310✔
522
}
523

524
int32_t dmInitSyncClient(SDnode *pDnode) {
2,310✔
525
  SDnodeTrans *pTrans = &pDnode->trans;
2,310✔
526

527
  SRpcInit rpcInit = {0};
2,310✔
528
  rpcInit.label = "DNODE-SYNC-CLI";
2,310✔
529
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,310✔
530
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,310✔
531
  rpcInit.sessions = 1024;
2,310✔
532
  rpcInit.connType = TAOS_CONN_CLIENT;
2,310✔
533
  rpcInit.user = TSDB_DEFAULT_USER;
2,310✔
534
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,310✔
535
  rpcInit.parent = pDnode;
2,310✔
536
  rpcInit.rfp = rpcRfp;
2,310✔
537
  rpcInit.compressSize = tsCompressMsgSize;
2,310✔
538

539
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,310✔
540
  rpcInit.retryStepFactor = tsRedirectFactor;
2,310✔
541
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,310✔
542
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,310✔
543

544
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,310✔
545
  rpcInit.failFastThreshold = 3;    // failed threshold
2,310✔
546
  rpcInit.ffp = dmFailFastFp;
2,310✔
547

548
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
2,310✔
549
  connLimitNum = TMAX(connLimitNum, 10);
2,310✔
550
  connLimitNum = TMIN(connLimitNum, 500);
2,310✔
551

552
  rpcInit.connLimitNum = connLimitNum;
2,310✔
553
  rpcInit.connLimitLock = 1;
2,310✔
554
  rpcInit.supportBatch = 1;
2,310✔
555
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
2,310✔
556
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,310✔
557
  rpcInit.startReadTimer = 1;
2,310✔
558
  rpcInit.readTimeout = tsReadTimeout;
2,310✔
559
  rpcInit.ipv6 = tsEnableIpv6;
2,310✔
560

561
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,310!
562
    dError("failed to convert version string:%s to int", td_version);
×
563
  }
564

565
  pTrans->syncRpc = rpcOpen(&rpcInit);
2,310✔
566
  if (pTrans->syncRpc == NULL) {
2,310!
567
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
568
    return terrno;
×
569
  }
570

571
  dDebug("dnode rpc sync client is initialized");
2,310✔
572
  return 0;
2,310✔
573
}
574

575
void dmCleanupClient(SDnode *pDnode) {
2,310✔
576
  SDnodeTrans *pTrans = &pDnode->trans;
2,310✔
577
  if (pTrans->clientRpc) {
2,310!
578
    rpcClose(pTrans->clientRpc);
2,310✔
579
    pTrans->clientRpc = NULL;
2,310✔
580
    dDebug("dnode rpc client is closed");
2,310✔
581
  }
582
}
2,310✔
583
void dmCleanupStatusClient(SDnode *pDnode) {
2,310✔
584
  SDnodeTrans *pTrans = &pDnode->trans;
2,310✔
585
  if (pTrans->statusRpc) {
2,310!
586
    rpcClose(pTrans->statusRpc);
2,310✔
587
    pTrans->statusRpc = NULL;
2,310✔
588
    dDebug("dnode rpc status client is closed");
2,310✔
589
  }
590
}
2,310✔
591
void dmCleanupSyncClient(SDnode *pDnode) {
2,310✔
592
  SDnodeTrans *pTrans = &pDnode->trans;
2,310✔
593
  if (pTrans->syncRpc) {
2,310!
594
    rpcClose(pTrans->syncRpc);
2,310✔
595
    pTrans->syncRpc = NULL;
2,310✔
596
    dDebug("dnode rpc sync client is closed");
2,310✔
597
  }
598
}
2,310✔
599

600
int32_t dmInitServer(SDnode *pDnode) {
2,310✔
601
  int32_t      code = 0;
2,310✔
602
  SDnodeTrans *pTrans = &pDnode->trans;
2,310✔
603

604
  SRpcInit rpcInit = {0};
2,310✔
605
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
2,310✔
606

607
  rpcInit.localPort = tsServerPort;
2,310✔
608
  rpcInit.label = "DND-S";
2,310✔
609
  rpcInit.numOfThreads = tsNumOfRpcThreads;
2,310✔
610
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,310✔
611
  rpcInit.sessions = tsMaxShellConns;
2,310✔
612
  rpcInit.connType = TAOS_CONN_SERVER;
2,310✔
613
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,310✔
614
  rpcInit.parent = pDnode;
2,310✔
615
  rpcInit.compressSize = tsCompressMsgSize;
2,310✔
616
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
2,310✔
617
  rpcInit.ipv6 = tsEnableIpv6;
2,310✔
618

619
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,310!
620
    dError("failed to convert version string:%s to int", td_version);
×
621
  }
622

623
  pTrans->serverRpc = rpcOpen(&rpcInit);
2,310✔
624
  if (pTrans->serverRpc == NULL) {
2,310!
625
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
×
626
    return terrno;
×
627
  }
628

629
  dDebug("dnode rpc server is initialized");
2,310✔
630
  return 0;
2,310✔
631
}
632

633
void dmCleanupServer(SDnode *pDnode) {
2,310✔
634
  SDnodeTrans *pTrans = &pDnode->trans;
2,310✔
635
  if (pTrans->serverRpc) {
2,310!
636
    rpcClose(pTrans->serverRpc);
2,310✔
637
    pTrans->serverRpc = NULL;
2,310✔
638
    dDebug("dnode rpc server is closed");
2,310✔
639
  }
640
}
2,310✔
641

642
SMsgCb dmGetMsgcb(SDnode *pDnode) {
21,355✔
643
  SMsgCb msgCb = {
21,355✔
644
      .clientRpc = pDnode->trans.clientRpc,
21,355✔
645
      .serverRpc = pDnode->trans.serverRpc,
21,355✔
646
      .statusRpc = pDnode->trans.statusRpc,
21,355✔
647
      .syncRpc = pDnode->trans.syncRpc,
21,355✔
648
      .sendReqFp = dmSendReq,
649
      .sendSyncReqFp = dmSendSyncReq,
650
      .sendRspFp = dmSendRsp,
651
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
652
      .releaseHandleFp = dmReleaseHandle,
653
      .reportStartupFp = dmReportStartup,
654
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
655
      .getDnodeEpFp = dmGetDnodeEp,
656
      .data = &pDnode->data,
21,355✔
657
  };
658
  return msgCb;
21,355✔
659
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc