• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4469

08 Jul 2025 09:38AM UTC coverage: 62.22% (-1.2%) from 63.381%
#4469

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

153678 of 316510 branches covered (48.55%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

5035 existing lines in 221 files now uncovered.

238955 of 314529 relevant lines covered (75.97%)

6273248.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.43
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
5,803✔
23
  if (rpcSendResponse(pMsg) != 0) {
5,803!
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
5,803✔
27

28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
377✔
29
  SEpSet epSet = {0};
377✔
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
377✔
31

32
  if (epSet.numOfEps <= 1) {
378✔
33
    if (epSet.numOfEps == 0) {
184!
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
184!
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
378✔
48
  pMsg->pCont = rpcMallocCont(contLen);
375✔
49
  if (pMsg->pCont == NULL) {
375!
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
375✔
53
    if (contLen < 0) {
376!
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
57
    pMsg->contLen = contLen;
376✔
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
11,585,104✔
62
  const STraceId *trace = &pMsg->info.traceId;
11,585,104✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
11,585,104✔
65
  if (msgFp == NULL) {
11,585,104!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
11,585,104!
72
  pMsg->info.wrapper = pWrapper;
11,585,105✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
11,585,105✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
83,799✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
83,799✔
83
    return code;
9,878✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
73,921✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
23,322✔
87
    code = TSDB_CODE_VND_STOPPED;
9,655✔
88
  }
89
  return code;
73,921✔
90
}
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
6✔
92
  int32_t        code = 0;
6✔
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
6✔
94
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
6✔
95
  if (code < 0) {
6!
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
6✔
100
  pData->ipWhiteVer = ipWhite.ver;
6✔
101

102
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
6✔
103

104
  rpcFreeCont(pRpc->pCont);
6✔
105
}
106

107
static void dmUpdateRpcIpWhiteUnused(SDnodeData *pDnode, void *pTrans, SRpcMsg *pRpc) {
×
108
  int32_t code = TSDB_CODE_INVALID_MSG;
×
109
  dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
110
  rpcFreeCont(pRpc->pCont);
×
111
  pRpc->pCont = NULL;
×
112
  return;
×
113
}
114
static bool dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
12,763,609✔
115
  if (forbidden) {
12,763,609!
116
    dError("User:%s host:%s not in ip white list", user, IP_ADDR_STR(clientIp));
×
117
    return true;
×
118
  } else {
119
    return false;
12,763,609✔
120
  }
121
}
122

123
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
2✔
124
  SRetrieveAnalyticAlgoRsp rsp = {0};
2✔
125
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
2!
126
    taosAnalyUpdate(rsp.ver, rsp.hash);
2✔
127
    rsp.hash = NULL;
2✔
128
  }
129
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
2✔
130
  rpcFreeCont(pRpc->pCont);
2✔
131
}
2✔
132

133
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
12,766,045✔
134
  SDnodeTrans  *pTrans = &pDnode->trans;
12,766,045✔
135
  int32_t       code = -1;
12,766,045✔
136
  SRpcMsg      *pMsg = NULL;
12,766,045✔
137
  SMgmtWrapper *pWrapper = NULL;
12,766,045✔
138
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
12,766,045✔
139

140
  const STraceId *trace = &pRpc->info.traceId;
12,766,045✔
141
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
12,766,045!
142
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
143

144
  int32_t svrVer = 0;
12,766,044✔
145
  code = taosVersionStrToInt(td_version, &svrVer);
12,766,044✔
146
  if (code != 0) {
12,765,430!
147
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
148
    goto _OVER;
×
149
  }
150
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
12,765,430!
151
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
152
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
153
    goto _OVER;
×
154
  }
155

156
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, &pRpc->info.conn.cliAddr);
12,772,839✔
157
  if (isForbidden) {
12,766,613!
158
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
159
    goto _OVER;
×
160
  }
161

162
  switch (pRpc->msgType) {
12,766,613!
163
    case TDMT_DND_NET_TEST:
×
164
      dmProcessNetTestReq(pDnode, pRpc);
×
165
      return;
1,105,563✔
166
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
1,104,805✔
167
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
168
    case TDMT_SCH_FETCH_RSP:
169
    case TDMT_SCH_MERGE_FETCH_RSP:
170
    case TDMT_VND_SUBMIT_RSP:
171
    case TDMT_MND_GET_DB_INFO_RSP:
172
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
1,104,805✔
173
      return;
1,105,555✔
174
    case TDMT_MND_STATUS_RSP:
×
175
      if (pEpSet != NULL) {
×
176
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
177
      }
178
      break;
×
179
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
180
      dmUpdateRpcIpWhiteUnused(&pDnode->data, pTrans->serverRpc, pRpc);
×
181
      return;
×
182
    case TDMT_MND_RETRIEVE_IP_WHITE_DUAL_RSP:
6✔
183
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
6✔
184
      return;
6✔
185
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
2✔
186
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
2✔
187
      return;
2✔
188
    default:
11,661,800✔
189
      break;
11,661,800✔
190
  }
191

192
  /*
193
  pDnode is null, TD-22618
194
  at trans.c line 91
195
  before this line, dmProcessRpcMsg callback is set
196
  after this line, parent is set
197
  so when dmProcessRpcMsg is called, pDonde is still null.
198
  */
199
  if (pDnode != NULL) {
11,661,800✔
200
    if (pDnode->status != DND_STAT_RUNNING) {
11,658,892✔
201
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
77,578!
202
        dmProcessServerStartupStatus(pDnode, pRpc);
×
203
        return;
×
204
      } else {
205
        if (pDnode->status == DND_STAT_INIT) {
77,578✔
206
          code = TSDB_CODE_APP_IS_STARTING;
3,656✔
207
        } else {
208
          code = TSDB_CODE_APP_IS_STOPPING;
73,922✔
209
        }
210
        goto _OVER;
77,578✔
211
      }
212
    }
213
  } else {
214
    code = TSDB_CODE_APP_IS_STARTING;
2,908✔
215
    goto _OVER;
2,908✔
216
  }
217

218
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
11,581,314!
219
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
220
    code = TSDB_CODE_INVALID_MSG_LEN;
×
221
    goto _OVER;
×
222
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
11,581,314✔
223
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
14,647!
224
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
15!
225
  }
226

227
  if (pHandle->defaultNtype == NODE_END) {
11,581,314!
228
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
229
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
230
    goto _OVER;
×
231
  }
232

233
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
11,581,314✔
234
  if (pHandle->needCheckVgId) {
11,581,314✔
235
    if (pRpc->contLen > 0) {
8,212,523!
236
      const SMsgHead *pHead = pRpc->pCont;
8,215,341✔
237
      const int32_t   vgId = ntohl(pHead->vgId);
8,215,341✔
238
      switch (vgId) {
8,215,341✔
239
        case QNODE_HANDLE:
1,095,586✔
240
          pWrapper = &pDnode->wrappers[QNODE];
1,095,586✔
241
          break;
1,095,586✔
242
        case SNODE_HANDLE:
2,801✔
243
          pWrapper = &pDnode->wrappers[SNODE];
2,801✔
244
          break;
2,801✔
245
        case MNODE_HANDLE:
131,737✔
246
          pWrapper = &pDnode->wrappers[MNODE];
131,737✔
247
          break;
131,737✔
248
        default:
6,985,217✔
249
          break;
6,985,217✔
250
      }
251
    } else {
252
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
253
      code = TSDB_CODE_INVALID_MSG_LEN;
×
254
      goto _OVER;
×
255
    }
256
  }
257

258
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
11,584,132✔
259
    pWrapper = NULL;
411✔
260
    goto _OVER;
411✔
261
  }
262

263
  pRpc->info.wrapper = pWrapper;
11,589,700✔
264

265
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
11,589,700✔
266
  if (pRpc->msgType == TDMT_SYNC_HEARTBEAT || pRpc->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
11,589,700✔
267
    itype = DEF_QITEM;
95,370✔
268
  } else if (IsReq(pRpc)) {
11,494,330✔
269
    itype = APPLY_QITEM;
11,358,759✔
270
  }
271
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
11,589,700✔
272
  if (code) goto _OVER;
11,594,932!
273

274
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
11,594,932✔
275
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
11,594,932!
276
          pRpc->contLen);
277

278
  code = dmProcessNodeMsg(pWrapper, pMsg);
11,594,935✔
279

280
_OVER:
11,670,252✔
281
  if (code != 0) {
11,670,252✔
282
    code = dmConvertErrCode(pRpc->msgType, code);
83,800✔
283
    if (pMsg) {
83,799✔
284
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
5,806!
285
    } else {
286
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
77,993!
287
    }
288

289
    if (IsReq(pRpc)) {
83,799✔
290
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
83,688✔
291
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
83,688✔
292
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
381✔
293
      }
294

295
      if (pWrapper != NULL) {
83,686✔
296
        dmSendRsp(&rsp);
5,803✔
297
      } else {
298
        if (rpcSendResponse(&rsp) != 0) {
77,883!
299
          dError("failed to send response, msg:%p", &rsp);
×
300
        }
301
      }
302
    }
303

304
    if (pMsg != NULL) {
83,798✔
305
      dGTrace("msg:%p, is freed", pMsg);
5,805!
306
      taosFreeQitem(pMsg);
5,805✔
307
    }
308
    rpcFreeCont(pRpc->pCont);
83,799✔
309
    pRpc->pCont = NULL;
83,801✔
310
  }
311

312
  dmReleaseWrapper(pWrapper);
11,670,253✔
313
}
314

315
int32_t dmInitMsgHandle(SDnode *pDnode) {
3,061✔
316
  SDnodeTrans *pTrans = &pDnode->trans;
3,061✔
317

318
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
21,427✔
319
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
18,366✔
320
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
18,366✔
321
    if (pArray == NULL) return -1;
18,366!
322

323
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
1,111,143✔
324
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
1,092,777✔
325
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
1,092,777✔
326
      if (pMgmt->needCheckVgId) {
1,092,777✔
327
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
183,660✔
328
      }
329
      if (!pMgmt->needCheckVgId) {
1,092,777✔
330
        pHandle->defaultNtype = ntype;
909,117✔
331
      }
332
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
1,092,777✔
333
    }
334

335
    taosArrayDestroy(pArray);
18,366✔
336
  }
337

338
  return 0;
3,061✔
339
}
340

341
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
359,036✔
342
  int32_t code = 0;
359,036✔
343
  SDnode *pDnode = dmInstance();
359,036✔
344
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
359,037✔
345
    rpcFreeCont(pMsg->pCont);
677✔
346
    pMsg->pCont = NULL;
677✔
347
    if (pDnode->status == DND_STAT_INIT) {
677!
348
      code = TSDB_CODE_APP_IS_STARTING;
×
349
    } else {
350
      code = TSDB_CODE_APP_IS_STOPPING;
677✔
351
    }
352
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
677!
353
           pMsg->info.handle);
354
    return code;
677✔
355
  } else {
356
    pMsg->info.handle = 0;
358,360✔
357
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
358,360!
358
      dError("failed to send rpc msg");
×
359
    }
360
    return 0;
358,360✔
361
  }
362
}
363
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
1,057,783✔
364
  int32_t code = 0;
1,057,783✔
365
  SDnode *pDnode = dmInstance();
1,057,783✔
366
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
1,057,791!
367
    rpcFreeCont(pMsg->pCont);
×
368
    pMsg->pCont = NULL;
×
369
    if (pDnode->status == DND_STAT_INIT) {
×
370
      code = TSDB_CODE_APP_IS_STARTING;
×
371
    } else {
372
      code = TSDB_CODE_APP_IS_STOPPING;
×
373
    }
374
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
375
           pMsg->info.handle);
376
    return code;
×
377
  } else {
378
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
1,057,791✔
379
  }
380
}
381

382
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
3,056,643✔
383

384
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
3,051,957✔
385
  (void)rpcReleaseHandle(pHandle, type, status);
3,051,957✔
386
}
3,053,315✔
387

388
static bool rpcRfp(int32_t code, tmsg_t msgType) {
64,395✔
389
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
64,395!
390
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
52,805✔
391
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
14,551✔
392
      code == TSDB_CODE_APP_IS_STOPPING) {
393
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
58,095!
394
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
58,095!
UNCOV
395
      return false;
×
396
    }
397
    return true;
58,095✔
398
  } else {
399
    return false;
6,300✔
400
  }
401
}
402
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
403
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
404
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
405
    return true;
×
406
  }
407
  return false;
×
408
}
409
int32_t dmInitClient(SDnode *pDnode) {
3,056✔
410
  SDnodeTrans *pTrans = &pDnode->trans;
3,056✔
411

412
  SRpcInit rpcInit = {0};
3,056✔
413
  rpcInit.label = "DNODE-CLI";
3,056✔
414
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
3,056✔
415
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
3,056✔
416
  rpcInit.sessions = 1024;
3,056✔
417
  rpcInit.connType = TAOS_CONN_CLIENT;
3,056✔
418
  rpcInit.user = TSDB_DEFAULT_USER;
3,056✔
419
  rpcInit.idleTime = tsShellActivityTimer * 1000;
3,056✔
420
  rpcInit.parent = pDnode;
3,056✔
421
  rpcInit.rfp = rpcRfp;
3,056✔
422
  rpcInit.compressSize = tsCompressMsgSize;
3,056✔
423
  rpcInit.dfp = destroyAhandle;
3,056✔
424

425
  rpcInit.retryMinInterval = tsRedirectPeriod;
3,056✔
426
  rpcInit.retryStepFactor = tsRedirectFactor;
3,056✔
427
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
3,056✔
428
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
3,056✔
429

430
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
3,056✔
431
  rpcInit.failFastThreshold = 3;    // failed threshold
3,056✔
432
  rpcInit.ffp = dmFailFastFp;
3,056✔
433

434
  rpcInit.noDelayFp = rpcNoDelayMsg;
3,056✔
435

436
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
3,056✔
437
  connLimitNum = TMAX(connLimitNum, 10);
3,056✔
438
  connLimitNum = TMIN(connLimitNum, 500);
3,056✔
439

440
  rpcInit.connLimitNum = connLimitNum;
3,056✔
441
  rpcInit.connLimitLock = 1;
3,056✔
442
  rpcInit.supportBatch = 1;
3,056✔
443
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
3,056✔
444
  rpcInit.shareConn = 1;
3,056✔
445
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
3,056✔
446
  rpcInit.notWaitAvaliableConn = 0;
3,056✔
447
  rpcInit.startReadTimer = 1;
3,056✔
448
  rpcInit.readTimeout = tsReadTimeout;
3,056✔
449
  rpcInit.ipv6 = tsEnableIpv6;
3,056✔
450

451
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
3,056!
452
    dError("failed to convert version string:%s to int", td_version);
×
453
  }
454

455
  pTrans->clientRpc = rpcOpen(&rpcInit);
3,056✔
456
  if (pTrans->clientRpc == NULL) {
3,056!
457
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
458
    return terrno;
×
459
  }
460

461
  dDebug("dnode rpc client is initialized");
3,056✔
462
  return 0;
3,056✔
463
}
464
int32_t dmInitStatusClient(SDnode *pDnode) {
3,056✔
465
  SDnodeTrans *pTrans = &pDnode->trans;
3,056✔
466

467
  SRpcInit rpcInit = {0};
3,056✔
468
  rpcInit.label = "DNODE-STA-CLI";
3,056✔
469
  rpcInit.numOfThreads = 1;
3,056✔
470
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
3,056✔
471
  rpcInit.sessions = 1024;
3,056✔
472
  rpcInit.connType = TAOS_CONN_CLIENT;
3,056✔
473
  rpcInit.user = TSDB_DEFAULT_USER;
3,056✔
474
  rpcInit.idleTime = tsShellActivityTimer * 1000;
3,056✔
475
  rpcInit.parent = pDnode;
3,056✔
476
  rpcInit.rfp = rpcRfp;
3,056✔
477
  rpcInit.compressSize = tsCompressMsgSize;
3,056✔
478

479
  rpcInit.retryMinInterval = tsRedirectPeriod;
3,056✔
480
  rpcInit.retryStepFactor = tsRedirectFactor;
3,056✔
481
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
3,056✔
482
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
3,056✔
483

484
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
3,056✔
485
  rpcInit.failFastThreshold = 3;    // failed threshold
3,056✔
486
  rpcInit.ffp = dmFailFastFp;
3,056✔
487

488
  int32_t connLimitNum = 100;
3,056✔
489
  connLimitNum = TMAX(connLimitNum, 10);
3,056✔
490
  connLimitNum = TMIN(connLimitNum, 500);
3,056✔
491

492
  rpcInit.connLimitNum = connLimitNum;
3,056✔
493
  rpcInit.connLimitLock = 1;
3,056✔
494
  rpcInit.supportBatch = 1;
3,056✔
495
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
3,056✔
496
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
3,056✔
497
  rpcInit.startReadTimer = 0;
3,056✔
498
  rpcInit.readTimeout = 0;
3,056✔
499
  rpcInit.ipv6 = tsEnableIpv6;
3,056✔
500

501
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
3,056!
502
    dError("failed to convert version string:%s to int", td_version);
×
503
  }
504

505
  pTrans->statusRpc = rpcOpen(&rpcInit);
3,056✔
506
  if (pTrans->statusRpc == NULL) {
3,056!
507
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
508
    return terrno;
×
509
  }
510

511
  dDebug("dnode rpc status client is initialized");
3,056✔
512
  return 0;
3,056✔
513
}
514

515
int32_t dmInitSyncClient(SDnode *pDnode) {
3,056✔
516
  SDnodeTrans *pTrans = &pDnode->trans;
3,056✔
517

518
  SRpcInit rpcInit = {0};
3,056✔
519
  rpcInit.label = "DNODE-SYNC-CLI";
3,056✔
520
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
3,056✔
521
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
3,056✔
522
  rpcInit.sessions = 1024;
3,056✔
523
  rpcInit.connType = TAOS_CONN_CLIENT;
3,056✔
524
  rpcInit.user = TSDB_DEFAULT_USER;
3,056✔
525
  rpcInit.idleTime = tsShellActivityTimer * 1000;
3,056✔
526
  rpcInit.parent = pDnode;
3,056✔
527
  rpcInit.rfp = rpcRfp;
3,056✔
528
  rpcInit.compressSize = tsCompressMsgSize;
3,056✔
529

530
  rpcInit.retryMinInterval = tsRedirectPeriod;
3,056✔
531
  rpcInit.retryStepFactor = tsRedirectFactor;
3,056✔
532
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
3,056✔
533
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
3,056✔
534

535
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
3,056✔
536
  rpcInit.failFastThreshold = 3;    // failed threshold
3,056✔
537
  rpcInit.ffp = dmFailFastFp;
3,056✔
538

539
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
3,056✔
540
  connLimitNum = TMAX(connLimitNum, 10);
3,056✔
541
  connLimitNum = TMIN(connLimitNum, 500);
3,056✔
542

543
  rpcInit.connLimitNum = connLimitNum;
3,056✔
544
  rpcInit.connLimitLock = 1;
3,056✔
545
  rpcInit.supportBatch = 1;
3,056✔
546
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
3,056✔
547
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
3,056✔
548
  rpcInit.startReadTimer = 1;
3,056✔
549
  rpcInit.readTimeout = tsReadTimeout;
3,056✔
550
  rpcInit.ipv6 = tsEnableIpv6;
3,056✔
551

552
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
3,056!
553
    dError("failed to convert version string:%s to int", td_version);
×
554
  }
555

556
  pTrans->syncRpc = rpcOpen(&rpcInit);
3,056✔
557
  if (pTrans->syncRpc == NULL) {
3,056!
558
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
559
    return terrno;
×
560
  }
561

562
  dDebug("dnode rpc sync client is initialized");
3,056✔
563
  return 0;
3,056✔
564
}
565

566
void dmCleanupClient(SDnode *pDnode) {
3,056✔
567
  SDnodeTrans *pTrans = &pDnode->trans;
3,056✔
568
  if (pTrans->clientRpc) {
3,056!
569
    rpcClose(pTrans->clientRpc);
3,056✔
570
    pTrans->clientRpc = NULL;
3,056✔
571
    dDebug("dnode rpc client is closed");
3,056✔
572
  }
573
}
3,056✔
574
void dmCleanupStatusClient(SDnode *pDnode) {
3,056✔
575
  SDnodeTrans *pTrans = &pDnode->trans;
3,056✔
576
  if (pTrans->statusRpc) {
3,056!
577
    rpcClose(pTrans->statusRpc);
3,056✔
578
    pTrans->statusRpc = NULL;
3,056✔
579
    dDebug("dnode rpc status client is closed");
3,056✔
580
  }
581
}
3,056✔
582
void dmCleanupSyncClient(SDnode *pDnode) {
3,056✔
583
  SDnodeTrans *pTrans = &pDnode->trans;
3,056✔
584
  if (pTrans->syncRpc) {
3,056!
585
    rpcClose(pTrans->syncRpc);
3,056✔
586
    pTrans->syncRpc = NULL;
3,056✔
587
    dDebug("dnode rpc sync client is closed");
3,056✔
588
  }
589
}
3,056✔
590

591
int32_t dmInitServer(SDnode *pDnode) {
3,061✔
592
  int32_t      code = 0;
3,061✔
593
  SDnodeTrans *pTrans = &pDnode->trans;
3,061✔
594

595
  SRpcInit rpcInit = {0};
3,061✔
596
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
3,061✔
597

598
  rpcInit.localPort = tsServerPort;
3,061✔
599
  rpcInit.label = "DND-S";
3,061✔
600
  rpcInit.numOfThreads = tsNumOfRpcThreads;
3,061✔
601
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
3,061✔
602
  rpcInit.sessions = tsMaxShellConns;
3,061✔
603
  rpcInit.connType = TAOS_CONN_SERVER;
3,061✔
604
  rpcInit.idleTime = tsShellActivityTimer * 1000;
3,061✔
605
  rpcInit.parent = pDnode;
3,061✔
606
  rpcInit.compressSize = tsCompressMsgSize;
3,061✔
607
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
3,061✔
608
  rpcInit.ipv6 = tsEnableIpv6;
3,061✔
609

610
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
3,061!
611
    dError("failed to convert version string:%s to int", td_version);
×
612
  }
613

614
  pTrans->serverRpc = rpcOpen(&rpcInit);
3,061✔
615
  if (pTrans->serverRpc == NULL) {
3,061✔
616
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
5!
617
    return terrno;
5✔
618
  }
619

620
  dDebug("dnode rpc server is initialized");
3,056✔
621
  return 0;
3,056✔
622
}
623

624
void dmCleanupServer(SDnode *pDnode) {
3,056✔
625
  SDnodeTrans *pTrans = &pDnode->trans;
3,056✔
626
  if (pTrans->serverRpc) {
3,056!
627
    rpcClose(pTrans->serverRpc);
3,056✔
628
    pTrans->serverRpc = NULL;
3,056✔
629
    dDebug("dnode rpc server is closed");
3,056✔
630
  }
631
}
3,056✔
632

633
SMsgCb dmGetMsgcb(SDnode *pDnode) {
28,496✔
634
  SMsgCb msgCb = {
28,496✔
635
      .clientRpc = pDnode->trans.clientRpc,
28,496✔
636
      .serverRpc = pDnode->trans.serverRpc,
28,496✔
637
      .statusRpc = pDnode->trans.statusRpc,
28,496✔
638
      .syncRpc = pDnode->trans.syncRpc,
28,496✔
639
      .sendReqFp = dmSendReq,
640
      .sendSyncReqFp = dmSendSyncReq,
641
      .sendRspFp = dmSendRsp,
642
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
643
      .releaseHandleFp = dmReleaseHandle,
644
      .reportStartupFp = dmReportStartup,
645
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
646
      .getDnodeEpFp = dmGetDnodeEp,
647
      .data = &pDnode->data,
28,496✔
648
  };
649
  return msgCb;
28,496✔
650
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc