• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3610

12 Feb 2025 09:54AM UTC coverage: 54.713% (-8.4%) from 63.066%
#3610

push

travis-ci

web-flow
Merge pull request #29745 from taosdata/fix/TD33664-3.0

fix: --version show information check for 3.0

120957 of 286549 branches covered (42.21%)

Branch coverage included in aggregate %.

190849 of 283342 relevant lines covered (67.36%)

4969786.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.62
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
6,910✔
23
  if (rpcSendResponse(pMsg) != 0) {
6,910!
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
6,910✔
27

28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
346✔
29
  SEpSet epSet = {0};
346✔
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
346✔
31

32
  if (epSet.numOfEps <= 1) {
344✔
33
    if (epSet.numOfEps == 0) {
195!
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
195!
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
344✔
48
  pMsg->pCont = rpcMallocCont(contLen);
332✔
49
  if (pMsg->pCont == NULL) {
337!
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
337✔
53
    if (contLen < 0) {
346!
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
57
    pMsg->contLen = contLen;
346✔
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
8,086,686✔
62
  const STraceId *trace = &pMsg->info.traceId;
8,086,686✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
8,086,686✔
65
  if (msgFp == NULL) {
8,086,686!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
8,086,686!
72
  pMsg->info.wrapper = pWrapper;
8,086,694✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
8,086,694✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
46,895✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
46,895✔
83
    return code;
10,957✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
35,938✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
8,936✔
87
    code = TSDB_CODE_VND_STOPPED;
4,679✔
88
  }
89
  return code;
35,938✔
90
}
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
6✔
92
  int32_t        code = 0;
6✔
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
6✔
94
  code = tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
6✔
95
  if (code < 0) {
6!
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
6✔
100
  pData->ipWhiteVer = ipWhite.ver;
6✔
101

102
  (void)tFreeSUpdateIpWhiteReq(&ipWhite);
6✔
103

104
  rpcFreeCont(pRpc->pCont);
6✔
105
}
106
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
9,001,419✔
107
  if (forbidden) {
9,001,419!
108
    SIpV4Range range = {.ip = clientIp, .mask = 32};
×
109
    char       buf[36] = {0};
×
110

111
    (void)rpcUtilSIpRangeToStr(&range, buf);
×
112
    dError("User:%s host:%s not in ip white list", user, buf);
×
113
    return true;
×
114
  } else {
115
    return false;
9,001,419✔
116
  }
117
}
118

119
static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
120
  SRetrieveAnalAlgoRsp rsp = {0};
×
121
  if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
122
    taosAnalUpdate(rsp.ver, rsp.hash);
×
123
    rsp.hash = NULL;
×
124
  }
125
  tFreeRetrieveAnalAlgoRsp(&rsp);
×
126
  rpcFreeCont(pRpc->pCont);
×
127
}
×
128

129
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
9,007,369✔
130
  SDnodeTrans  *pTrans = &pDnode->trans;
9,007,369✔
131
  int32_t       code = -1;
9,007,369✔
132
  SRpcMsg      *pMsg = NULL;
9,007,369✔
133
  SMgmtWrapper *pWrapper = NULL;
9,007,369✔
134
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
9,007,369✔
135

136
  const STraceId *trace = &pRpc->info.traceId;
9,007,369✔
137
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
9,007,369!
138
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
139

140
  int32_t svrVer = 0;
9,007,370✔
141
  code = taosVersionStrToInt(td_version, &svrVer);
9,007,370✔
142
  if (code != 0) {
9,005,800!
143
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
144
    goto _OVER;
×
145
  }
146
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
9,005,800!
147
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
×
148
           pRpc->info.conn.clientIp);
149
    goto _OVER;
×
150
  }
151

152
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
9,003,996✔
153
  if (isForbidden) {
9,001,356!
154
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
155
    goto _OVER;
×
156
  }
157

158
  switch (pRpc->msgType) {
9,001,356!
159
    case TDMT_DND_NET_TEST:
×
160
      dmProcessNetTestReq(pDnode, pRpc);
×
161
      return;
879,556✔
162
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
877,385✔
163
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
164
    case TDMT_SCH_FETCH_RSP:
165
    case TDMT_SCH_MERGE_FETCH_RSP:
166
    case TDMT_VND_SUBMIT_RSP:
167
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
877,385✔
168
      return;
879,550✔
169
    case TDMT_MND_STATUS_RSP:
×
170
      if (pEpSet != NULL) {
×
171
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
172
      }
173
      break;
×
174
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
6✔
175
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
6✔
176
      return;
6✔
177
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
×
178
      dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc);
×
179
      return;
×
180
    default:
8,123,965✔
181
      break;
8,123,965✔
182
  }
183

184
  /*
185
  pDnode is null, TD-22618
186
  at trans.c line 91
187
  before this line, dmProcessRpcMsg callback is set
188
  after this line, parent is set
189
  so when dmProcessRpcMsg is called, pDonde is still null.
190
  */
191
  if (pDnode != NULL) {
8,123,965!
192
    if (pDnode->status != DND_STAT_RUNNING) {
8,124,011✔
193
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
39,577!
194
        dmProcessServerStartupStatus(pDnode, pRpc);
×
195
        return;
×
196
      } else {
197
        if (pDnode->status == DND_STAT_INIT) {
39,577✔
198
          code = TSDB_CODE_APP_IS_STARTING;
3,637✔
199
        } else {
200
          code = TSDB_CODE_APP_IS_STOPPING;
35,940✔
201
        }
202
        goto _OVER;
39,577✔
203
      }
204
    }
205
  } else {
206
    code = TSDB_CODE_APP_IS_STARTING;
×
207
    goto _OVER;
×
208
  }
209

210
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
8,084,434!
211
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
212
    code = TSDB_CODE_INVALID_MSG_LEN;
×
213
    goto _OVER;
×
214
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
8,084,434✔
215
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
996!
216
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
×
217
  }
218

219
  if (pHandle->defaultNtype == NODE_END) {
8,084,434!
220
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
221
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
222
    goto _OVER;
×
223
  }
224

225
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
8,084,434✔
226
  if (pHandle->needCheckVgId) {
8,084,434✔
227
    if (pRpc->contLen > 0) {
6,178,108!
228
      const SMsgHead *pHead = pRpc->pCont;
6,180,672✔
229
      const int32_t   vgId = ntohl(pHead->vgId);
6,180,672✔
230
      switch (vgId) {
6,180,672✔
231
        case QNODE_HANDLE:
964,727✔
232
          pWrapper = &pDnode->wrappers[QNODE];
964,727✔
233
          break;
964,727✔
234
        case SNODE_HANDLE:
283✔
235
          pWrapper = &pDnode->wrappers[SNODE];
283✔
236
          break;
283✔
237
        case MNODE_HANDLE:
104,643✔
238
          pWrapper = &pDnode->wrappers[MNODE];
104,643✔
239
          break;
104,643✔
240
        default:
5,111,019✔
241
          break;
5,111,019✔
242
      }
243
    } else {
244
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
245
      code = TSDB_CODE_INVALID_MSG_LEN;
×
246
      goto _OVER;
×
247
    }
248
  }
249

250
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
8,086,998✔
251
    pWrapper = NULL;
375✔
252
    goto _OVER;
375✔
253
  }
254

255
  pRpc->info.wrapper = pWrapper;
8,092,604✔
256

257
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
8,092,604✔
258
  if (IsReq(pRpc) && pRpc->msgType != TDMT_SYNC_HEARTBEAT && pRpc->msgType != TDMT_SYNC_HEARTBEAT_REPLY)
8,092,604✔
259
    itype = RPC_QITEM;
7,908,643✔
260
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
8,092,604✔
261
  if (code) goto _OVER;
8,090,282!
262

263
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
8,090,282✔
264
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
8,090,282!
265
          pRpc->contLen);
266

267
  code = dmProcessNodeMsg(pWrapper, pMsg);
8,090,293✔
268

269
_OVER:
8,128,355✔
270
  if (code != 0) {
8,128,355✔
271
    code = dmConvertErrCode(pRpc->msgType, code);
46,894✔
272
    if (pMsg) {
46,898✔
273
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
6,925!
274
    } else {
275
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
39,973!
276
    }
277

278
    if (IsReq(pRpc)) {
46,898✔
279
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
46,776✔
280
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
46,776✔
281
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
343✔
282
      }
283

284
      if (pWrapper != NULL) {
46,778✔
285
        dmSendRsp(&rsp);
6,911✔
286
      } else {
287
        if (rpcSendResponse(&rsp) != 0) {
39,867!
288
          dError("failed to send response, msg:%p", &rsp);
×
289
        }
290
      }
291
    }
292

293
    if (pMsg != NULL) {
46,897✔
294
      dGTrace("msg:%p, is freed", pMsg);
6,923!
295
      taosFreeQitem(pMsg);
6,923✔
296
    }
297
    rpcFreeCont(pRpc->pCont);
46,899✔
298
    pRpc->pCont = NULL;
46,900✔
299
  }
300

301
  dmReleaseWrapper(pWrapper);
8,128,361✔
302
}
303

304
int32_t dmInitMsgHandle(SDnode *pDnode) {
1,290✔
305
  SDnodeTrans *pTrans = &pDnode->trans;
1,290✔
306

307
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
7,740✔
308
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
6,450✔
309
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
6,450✔
310
    if (pArray == NULL) return -1;
6,450!
311

312
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
436,020✔
313
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
429,570✔
314
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
429,570✔
315
      if (pMgmt->needCheckVgId) {
429,570✔
316
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
74,820✔
317
      }
318
      if (!pMgmt->needCheckVgId) {
429,570✔
319
        pHandle->defaultNtype = ntype;
354,750✔
320
      }
321
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
429,570✔
322
    }
323

324
    taosArrayDestroy(pArray);
6,450✔
325
  }
326

327
  return 0;
1,290✔
328
}
329

330
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
221,772✔
331
  int32_t code = 0;
221,772✔
332
  SDnode *pDnode = dmInstance();
221,772✔
333
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
221,772✔
334
    rpcFreeCont(pMsg->pCont);
215✔
335
    pMsg->pCont = NULL;
215✔
336
    if (pDnode->status == DND_STAT_INIT) {
215!
337
      code = TSDB_CODE_APP_IS_STARTING;
×
338
    } else {
339
      code = TSDB_CODE_APP_IS_STOPPING;
215✔
340
    }
341
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
215!
342
           pMsg->info.handle);
343
    return code;
215✔
344
  } else {
345
    pMsg->info.handle = 0;
221,557✔
346
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
221,557!
347
      dError("failed to send rpc msg");
×
348
    }
349
    return 0;
221,559✔
350
  }
351
}
352
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
536,256✔
353
  int32_t code = 0;
536,256✔
354
  SDnode *pDnode = dmInstance();
536,256✔
355
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
536,255!
356
    rpcFreeCont(pMsg->pCont);
×
357
    pMsg->pCont = NULL;
×
358
    if (pDnode->status == DND_STAT_INIT) {
×
359
      code = TSDB_CODE_APP_IS_STARTING;
×
360
    } else {
361
      code = TSDB_CODE_APP_IS_STOPPING;
×
362
    }
363
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
364
           pMsg->info.handle);
365
    return code;
×
366
  } else {
367
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
536,255✔
368
  }
369
}
370

371
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
2,408,503✔
372

373
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
2,408,962✔
374
  (void)rpcReleaseHandle(pHandle, type, status);
2,408,962✔
375
}
2,410,243✔
376

377
static bool rpcRfp(int32_t code, tmsg_t msgType) {
40,229✔
378
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
40,229!
379
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
37,831✔
380
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
11,609✔
381
      code == TSDB_CODE_APP_IS_STOPPING) {
382
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
37,390!
383
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
37,391!
384
      return false;
×
385
    }
386
    return true;
37,391✔
387
  } else {
388
    return false;
2,839✔
389
  }
390
}
391
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
392
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
393
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
394
    return true;
×
395
  }
396
  return false;
×
397
}
398
int32_t dmInitClient(SDnode *pDnode) {
1,288✔
399
  SDnodeTrans *pTrans = &pDnode->trans;
1,288✔
400

401
  SRpcInit rpcInit = {0};
1,288✔
402
  rpcInit.label = "DNODE-CLI";
1,288✔
403
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
1,288✔
404
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
1,288✔
405
  rpcInit.sessions = 1024;
1,288✔
406
  rpcInit.connType = TAOS_CONN_CLIENT;
1,288✔
407
  rpcInit.user = TSDB_DEFAULT_USER;
1,288✔
408
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,288✔
409
  rpcInit.parent = pDnode;
1,288✔
410
  rpcInit.rfp = rpcRfp;
1,288✔
411
  rpcInit.compressSize = tsCompressMsgSize;
1,288✔
412
  rpcInit.dfp = destroyAhandle;
1,288✔
413

414
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,288✔
415
  rpcInit.retryStepFactor = tsRedirectFactor;
1,288✔
416
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,288✔
417
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,288✔
418

419
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
1,288✔
420
  rpcInit.failFastThreshold = 3;    // failed threshold
1,288✔
421
  rpcInit.ffp = dmFailFastFp;
1,288✔
422

423
  rpcInit.noDelayFp = rpcNoDelayMsg;
1,288✔
424

425
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
1,288✔
426
  connLimitNum = TMAX(connLimitNum, 10);
1,288✔
427
  connLimitNum = TMIN(connLimitNum, 500);
1,288✔
428

429
  rpcInit.connLimitNum = connLimitNum;
1,288✔
430
  rpcInit.connLimitLock = 1;
1,288✔
431
  rpcInit.supportBatch = 1;
1,288✔
432
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
1,288✔
433
  rpcInit.shareConn = 1;
1,288✔
434
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,288✔
435
  rpcInit.notWaitAvaliableConn = 0;
1,288✔
436
  rpcInit.startReadTimer = 1;
1,288✔
437
  rpcInit.readTimeout = tsReadTimeout;
1,288✔
438

439
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
1,288!
440
    dError("failed to convert version string:%s to int", td_version);
×
441
  }
442

443
  pTrans->clientRpc = rpcOpen(&rpcInit);
1,288✔
444
  if (pTrans->clientRpc == NULL) {
1,288!
445
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
446
    return terrno;
×
447
  }
448

449
  dDebug("dnode rpc client is initialized");
1,288✔
450
  return 0;
1,288✔
451
}
452
int32_t dmInitStatusClient(SDnode *pDnode) {
1,288✔
453
  SDnodeTrans *pTrans = &pDnode->trans;
1,288✔
454

455
  SRpcInit rpcInit = {0};
1,288✔
456
  rpcInit.label = "DNODE-STA-CLI";
1,288✔
457
  rpcInit.numOfThreads = 1;
1,288✔
458
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
1,288✔
459
  rpcInit.sessions = 1024;
1,288✔
460
  rpcInit.connType = TAOS_CONN_CLIENT;
1,288✔
461
  rpcInit.user = TSDB_DEFAULT_USER;
1,288✔
462
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,288✔
463
  rpcInit.parent = pDnode;
1,288✔
464
  rpcInit.rfp = rpcRfp;
1,288✔
465
  rpcInit.compressSize = tsCompressMsgSize;
1,288✔
466

467
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,288✔
468
  rpcInit.retryStepFactor = tsRedirectFactor;
1,288✔
469
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,288✔
470
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,288✔
471

472
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
1,288✔
473
  rpcInit.failFastThreshold = 3;    // failed threshold
1,288✔
474
  rpcInit.ffp = dmFailFastFp;
1,288✔
475

476
  int32_t connLimitNum = 100;
1,288✔
477
  connLimitNum = TMAX(connLimitNum, 10);
1,288✔
478
  connLimitNum = TMIN(connLimitNum, 500);
1,288✔
479

480
  rpcInit.connLimitNum = connLimitNum;
1,288✔
481
  rpcInit.connLimitLock = 1;
1,288✔
482
  rpcInit.supportBatch = 1;
1,288✔
483
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
1,288✔
484
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,288✔
485
  rpcInit.startReadTimer = 0;
1,288✔
486
  rpcInit.readTimeout = 0;
1,288✔
487

488
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
1,288!
489
    dError("failed to convert version string:%s to int", td_version);
×
490
  }
491

492
  pTrans->statusRpc = rpcOpen(&rpcInit);
1,288✔
493
  if (pTrans->statusRpc == NULL) {
1,288!
494
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
495
    return terrno;
×
496
  }
497

498
  dDebug("dnode rpc status client is initialized");
1,288✔
499
  return 0;
1,288✔
500
}
501

502
int32_t dmInitSyncClient(SDnode *pDnode) {
1,288✔
503
  SDnodeTrans *pTrans = &pDnode->trans;
1,288✔
504

505
  SRpcInit rpcInit = {0};
1,288✔
506
  rpcInit.label = "DNODE-SYNC-CLI";
1,288✔
507
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
1,288✔
508
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
1,288✔
509
  rpcInit.sessions = 1024;
1,288✔
510
  rpcInit.connType = TAOS_CONN_CLIENT;
1,288✔
511
  rpcInit.user = TSDB_DEFAULT_USER;
1,288✔
512
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,288✔
513
  rpcInit.parent = pDnode;
1,288✔
514
  rpcInit.rfp = rpcRfp;
1,288✔
515
  rpcInit.compressSize = tsCompressMsgSize;
1,288✔
516

517
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,288✔
518
  rpcInit.retryStepFactor = tsRedirectFactor;
1,288✔
519
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,288✔
520
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,288✔
521

522
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
1,288✔
523
  rpcInit.failFastThreshold = 3;    // failed threshold
1,288✔
524
  rpcInit.ffp = dmFailFastFp;
1,288✔
525

526
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
1,288✔
527
  connLimitNum = TMAX(connLimitNum, 10);
1,288✔
528
  connLimitNum = TMIN(connLimitNum, 500);
1,288✔
529

530
  rpcInit.connLimitNum = connLimitNum;
1,288✔
531
  rpcInit.connLimitLock = 1;
1,288✔
532
  rpcInit.supportBatch = 1;
1,288✔
533
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
1,288✔
534
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,288✔
535
  rpcInit.startReadTimer = 1;
1,288✔
536
  rpcInit.readTimeout = tsReadTimeout;
1,288✔
537

538
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
1,288!
539
    dError("failed to convert version string:%s to int", td_version);
×
540
  }
541

542
  pTrans->syncRpc = rpcOpen(&rpcInit);
1,288✔
543
  if (pTrans->syncRpc == NULL) {
1,288!
544
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
545
    return terrno;
×
546
  }
547

548
  dDebug("dnode rpc sync client is initialized");
1,288✔
549
  return 0;
1,288✔
550
}
551

552
void dmCleanupClient(SDnode *pDnode) {
1,288✔
553
  SDnodeTrans *pTrans = &pDnode->trans;
1,288✔
554
  if (pTrans->clientRpc) {
1,288!
555
    rpcClose(pTrans->clientRpc);
1,288✔
556
    pTrans->clientRpc = NULL;
1,288✔
557
    dDebug("dnode rpc client is closed");
1,288✔
558
  }
559
}
1,288✔
560
void dmCleanupStatusClient(SDnode *pDnode) {
1,288✔
561
  SDnodeTrans *pTrans = &pDnode->trans;
1,288✔
562
  if (pTrans->statusRpc) {
1,288!
563
    rpcClose(pTrans->statusRpc);
1,288✔
564
    pTrans->statusRpc = NULL;
1,288✔
565
    dDebug("dnode rpc status client is closed");
1,288✔
566
  }
567
}
1,288✔
568
void dmCleanupSyncClient(SDnode *pDnode) {
1,288✔
569
  SDnodeTrans *pTrans = &pDnode->trans;
1,288✔
570
  if (pTrans->syncRpc) {
1,288!
571
    rpcClose(pTrans->syncRpc);
1,288✔
572
    pTrans->syncRpc = NULL;
1,288✔
573
    dDebug("dnode rpc sync client is closed");
1,288✔
574
  }
575
}
1,288✔
576

577
int32_t dmInitServer(SDnode *pDnode) {
1,290✔
578
  SDnodeTrans *pTrans = &pDnode->trans;
1,290✔
579

580
  SRpcInit rpcInit = {0};
1,290✔
581
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
1,290✔
582
  rpcInit.localPort = tsServerPort;
1,290✔
583
  rpcInit.label = "DND-S";
1,290✔
584
  rpcInit.numOfThreads = tsNumOfRpcThreads;
1,290✔
585
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
1,290✔
586
  rpcInit.sessions = tsMaxShellConns;
1,290✔
587
  rpcInit.connType = TAOS_CONN_SERVER;
1,290✔
588
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,290✔
589
  rpcInit.parent = pDnode;
1,290✔
590
  rpcInit.compressSize = tsCompressMsgSize;
1,290✔
591
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
1,290✔
592

593
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
1,290!
594
    dError("failed to convert version string:%s to int", td_version);
×
595
  }
596

597
  pTrans->serverRpc = rpcOpen(&rpcInit);
1,290✔
598
  if (pTrans->serverRpc == NULL) {
1,290✔
599
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
2!
600
    return terrno;
2✔
601
  }
602

603
  dDebug("dnode rpc server is initialized");
1,288✔
604
  return 0;
1,288✔
605
}
606

607
void dmCleanupServer(SDnode *pDnode) {
1,288✔
608
  SDnodeTrans *pTrans = &pDnode->trans;
1,288✔
609
  if (pTrans->serverRpc) {
1,288!
610
    rpcClose(pTrans->serverRpc);
1,288✔
611
    pTrans->serverRpc = NULL;
1,288✔
612
    dDebug("dnode rpc server is closed");
1,288✔
613
  }
614
}
1,288✔
615

616
SMsgCb dmGetMsgcb(SDnode *pDnode) {
11,160✔
617
  SMsgCb msgCb = {
11,160✔
618
      .clientRpc = pDnode->trans.clientRpc,
11,160✔
619
      .serverRpc = pDnode->trans.serverRpc,
11,160✔
620
      .statusRpc = pDnode->trans.statusRpc,
11,160✔
621
      .syncRpc = pDnode->trans.syncRpc,
11,160✔
622
      .sendReqFp = dmSendReq,
623
      .sendSyncReqFp = dmSendSyncReq,
624
      .sendRspFp = dmSendRsp,
625
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
626
      .releaseHandleFp = dmReleaseHandle,
627
      .reportStartupFp = dmReportStartup,
628
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
629
      .getDnodeEpFp = dmGetDnodeEp,
630
      .data = &pDnode->data,
11,160✔
631
  };
632
  return msgCb;
11,160✔
633
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc