• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.93
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
15,846✔
23
  if (rpcSendResponse(pMsg) != 0) {
15,846!
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
15,848✔
27

28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
247✔
29
  SEpSet epSet = {0};
247✔
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
247✔
31

32
  if (epSet.numOfEps <= 1) {
252✔
33
    if (epSet.numOfEps == 0) {
120!
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
120!
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
252✔
48
  pMsg->pCont = rpcMallocCont(contLen);
253✔
49
  if (pMsg->pCont == NULL) {
251!
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
251✔
53
    if (contLen < 0) {
251!
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
57
    pMsg->contLen = contLen;
251✔
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
50,075,905✔
62
  const STraceId *trace = &pMsg->info.traceId;
50,075,905✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
50,075,905✔
65
  if (msgFp == NULL) {
50,075,905!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
50,075,905!
72
  pMsg->info.wrapper = pWrapper;
50,075,902✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
50,075,902✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
59,132✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
59,132✔
83
    return code;
19,437✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
39,695✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
14,320✔
87
    code = TSDB_CODE_VND_STOPPED;
8,608✔
88
  }
89
  return code;
39,695✔
90
}
UNCOV
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
UNCOV
92
  int32_t        code = 0;
×
UNCOV
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
×
UNCOV
94
  code = tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
×
UNCOV
95
  if (code < 0) {
×
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
UNCOV
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
×
UNCOV
100
  pData->ipWhiteVer = ipWhite.ver;
×
101

UNCOV
102
  (void)tFreeSUpdateIpWhiteReq(&ipWhite);
×
103

UNCOV
104
  rpcFreeCont(pRpc->pCont);
×
105
}
106
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
55,758,991✔
107
  if (forbidden) {
55,758,991!
108
    SIpV4Range range = {.ip = clientIp, .mask = 32};
×
109
    char       buf[36] = {0};
×
110

111
    (void)rpcUtilSIpRangeToStr(&range, buf);
×
112
    dError("User:%s host:%s not in ip white list", user, buf);
×
113
    return true;
×
114
  } else {
115
    return false;
55,758,991✔
116
  }
117
}
118

119
static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
120
  SRetrieveAnalAlgoRsp rsp = {0};
×
121
  if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
122
    taosAnalUpdate(rsp.ver, rsp.hash);
×
123
    rsp.hash = NULL;
×
124
  }
125
  tFreeRetrieveAnalAlgoRsp(&rsp);
×
126
  rpcFreeCont(pRpc->pCont);
×
127
}
×
128

129
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
55,769,538✔
130
  SDnodeTrans  *pTrans = &pDnode->trans;
55,769,538✔
131
  int32_t       code = -1;
55,769,538✔
132
  SRpcMsg      *pMsg = NULL;
55,769,538✔
133
  SMgmtWrapper *pWrapper = NULL;
55,769,538✔
134
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
55,769,538✔
135

136
  const STraceId *trace = &pRpc->info.traceId;
55,769,538✔
137
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
55,769,538!
138
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
139

140
  int32_t svrVer = 0;
55,769,539✔
141
  code = taosVersionStrToInt(td_version, &svrVer);
55,769,539✔
142
  if (code != 0) {
55,792,820!
143
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
144
    goto _OVER;
×
145
  }
146
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
55,792,820!
147
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
×
148
           pRpc->info.conn.clientIp);
149
    goto _OVER;
×
150
  }
151

152
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
55,773,491✔
153
  if (isForbidden) {
55,766,942!
154
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
155
    goto _OVER;
×
156
  }
157

158
  switch (pRpc->msgType) {
55,766,942!
159
    case TDMT_DND_NET_TEST:
×
160
      dmProcessNetTestReq(pDnode, pRpc);
×
161
      return;
5,670,986✔
162
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
5,662,331✔
163
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
164
    case TDMT_SCH_FETCH_RSP:
165
    case TDMT_SCH_MERGE_FETCH_RSP:
166
    case TDMT_VND_SUBMIT_RSP:
167
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
5,662,331✔
168
      return;
5,670,986✔
169
    case TDMT_MND_STATUS_RSP:
×
170
      if (pEpSet != NULL) {
×
171
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
172
      }
173
      break;
×
UNCOV
174
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
UNCOV
175
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
×
UNCOV
176
      return;
×
177
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
×
178
      dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc);
×
179
      return;
×
180
    default:
50,104,611✔
181
      break;
50,104,611✔
182
  }
183

184
  /*
185
  pDnode is null, TD-22618
186
  at trans.c line 91
187
  before this line, dmProcessRpcMsg callback is set
188
  after this line, parent is set
189
  so when dmProcessRpcMsg is called, pDonde is still null.
190
  */
191
  if (pDnode != NULL) {
50,104,611✔
192
    if (pDnode->status != DND_STAT_RUNNING) {
50,091,919✔
193
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
42,929!
194
        dmProcessServerStartupStatus(pDnode, pRpc);
×
195
        return;
×
196
      } else {
197
        if (pDnode->status == DND_STAT_INIT) {
42,929✔
198
          code = TSDB_CODE_APP_IS_STARTING;
3,232✔
199
        } else {
200
          code = TSDB_CODE_APP_IS_STOPPING;
39,697✔
201
        }
202
        goto _OVER;
42,929✔
203
      }
204
    }
205
  } else {
206
    code = TSDB_CODE_APP_IS_STARTING;
12,692✔
207
    goto _OVER;
12,692✔
208
  }
209

210
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
50,048,990!
211
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
212
    code = TSDB_CODE_INVALID_MSG_LEN;
×
213
    goto _OVER;
×
214
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
50,048,990✔
215
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
3,320!
216
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
9!
217
  }
218

219
  if (pHandle->defaultNtype == NODE_END) {
50,048,990!
220
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
221
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
222
    goto _OVER;
×
223
  }
224

225
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
50,048,990✔
226
  if (pHandle->needCheckVgId) {
50,048,990✔
227
    if (pRpc->contLen > 0) {
35,098,598!
228
      const SMsgHead *pHead = pRpc->pCont;
35,127,537✔
229
      const int32_t   vgId = ntohl(pHead->vgId);
35,127,537✔
230
      switch (vgId) {
35,127,537✔
231
        case QNODE_HANDLE:
1,069,840✔
232
          pWrapper = &pDnode->wrappers[QNODE];
1,069,840✔
233
          break;
1,069,840✔
234
        case SNODE_HANDLE:
10,090✔
235
          pWrapper = &pDnode->wrappers[SNODE];
10,090✔
236
          break;
10,090✔
237
        case MNODE_HANDLE:
758,807✔
238
          pWrapper = &pDnode->wrappers[MNODE];
758,807✔
239
          break;
758,807✔
240
        default:
33,288,800✔
241
          break;
33,288,800✔
242
      }
243
    } else {
244
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
245
      code = TSDB_CODE_INVALID_MSG_LEN;
×
246
      goto _OVER;
×
247
    }
248
  }
249

250
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
50,077,929✔
251
    pWrapper = NULL;
291✔
252
    goto _OVER;
291✔
253
  }
254

255
  pRpc->info.wrapper = pWrapper;
50,120,888✔
256

257
  EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
50,120,888✔
258
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
50,120,888✔
259
  if (code) goto _OVER;
50,131,262!
260

261
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
50,131,262✔
262
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
50,131,262✔
263
          pRpc->contLen);
264

265
  code = dmProcessNodeMsg(pWrapper, pMsg);
50,131,266✔
266

267
_OVER:
50,197,026✔
268
  if (code != 0) {
50,197,026✔
269
    code = dmConvertErrCode(pRpc->msgType, code);
59,133✔
270
    if (pMsg) {
59,139✔
271
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
15,870!
272
    } else {
273
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
43,269!
274
    }
275

276
    if (IsReq(pRpc)) {
59,139✔
277
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
59,000✔
278
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
59,000✔
279
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
252✔
280
      }
281

282
      if (pWrapper != NULL) {
59,004✔
283
        dmSendRsp(&rsp);
15,853✔
284
      } else {
285
        if (rpcSendResponse(&rsp) != 0) {
43,151!
286
          dError("failed to send response, msg:%p", &rsp);
×
287
        }
288
      }
289
    }
290

291
    if (pMsg != NULL) {
59,144✔
292
      dGTrace("msg:%p, is freed", pMsg);
15,867!
293
      taosFreeQitem(pMsg);
15,867✔
294
    }
295
    rpcFreeCont(pRpc->pCont);
59,143✔
296
    pRpc->pCont = NULL;
59,146✔
297
  }
298

299
  dmReleaseWrapper(pWrapper);
50,197,039✔
300
}
301

302
int32_t dmInitMsgHandle(SDnode *pDnode) {
2,440✔
303
  SDnodeTrans *pTrans = &pDnode->trans;
2,440✔
304

305
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
14,640✔
306
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
12,200✔
307
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
12,200✔
308
    if (pArray == NULL) return -1;
12,200!
309

310
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
814,960✔
311
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
802,760✔
312
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
802,760✔
313
      if (pMgmt->needCheckVgId) {
802,760✔
314
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
141,520✔
315
      }
316
      if (!pMgmt->needCheckVgId) {
802,760✔
317
        pHandle->defaultNtype = ntype;
661,240✔
318
      }
319
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
802,760✔
320
    }
321

322
    taosArrayDestroy(pArray);
12,200✔
323
  }
324

325
  return 0;
2,440✔
326
}
327

328
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
499,232✔
329
  int32_t code = 0;
499,232✔
330
  SDnode *pDnode = dmInstance();
499,232✔
331
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
499,230✔
332
    rpcFreeCont(pMsg->pCont);
292✔
333
    pMsg->pCont = NULL;
292✔
334
    if (pDnode->status == DND_STAT_INIT) {
292!
335
      code = TSDB_CODE_APP_IS_STARTING;
×
336
    } else {
337
      code = TSDB_CODE_APP_IS_STOPPING;
292✔
338
    }
339
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
292!
340
           pMsg->info.handle);
341
    return code;
292✔
342
  } else {
343
    pMsg->info.handle = 0;
498,938✔
344
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
498,938✔
345
      dError("failed to send rpc msg");
9!
346
    }
347
    return 0;
499,005✔
348
  }
349
}
350
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
5,762,084✔
351
  int32_t code = 0;
5,762,084✔
352
  SDnode *pDnode = dmInstance();
5,762,084✔
353
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
5,762,087!
354
    rpcFreeCont(pMsg->pCont);
×
355
    pMsg->pCont = NULL;
×
356
    if (pDnode->status == DND_STAT_INIT) {
×
357
      code = TSDB_CODE_APP_IS_STARTING;
×
358
    } else {
359
      code = TSDB_CODE_APP_IS_STOPPING;
×
360
    }
361
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
362
           pMsg->info.handle);
363
    return code;
×
364
  } else {
365
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
5,762,087✔
366
  }
367
}
368

369
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
10,389,251✔
370

371
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { (void)rpcReleaseHandle(pHandle, type); }
10,388,635✔
372

373
static bool rpcRfp(int32_t code, tmsg_t msgType) {
51,792✔
374
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
51,792!
375
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
46,848✔
376
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
12,673✔
377
      code == TSDB_CODE_APP_IS_STOPPING) {
378
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
45,438!
379
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
45,438!
380
      return false;
×
381
    }
382
    return true;
45,438✔
383
  } else {
384
    return false;
6,354✔
385
  }
386
}
387
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
388
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
389
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
390
    return true;
×
391
  }
392
  return false;
×
393
}
394
int32_t dmInitClient(SDnode *pDnode) {
2,440✔
395
  SDnodeTrans *pTrans = &pDnode->trans;
2,440✔
396

397
  SRpcInit rpcInit = {0};
2,440✔
398
  rpcInit.label = "DNODE-CLI";
2,440✔
399
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,440✔
400
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,440✔
401
  rpcInit.sessions = 1024;
2,440✔
402
  rpcInit.connType = TAOS_CONN_CLIENT;
2,440✔
403
  rpcInit.user = TSDB_DEFAULT_USER;
2,440✔
404
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,440✔
405
  rpcInit.parent = pDnode;
2,440✔
406
  rpcInit.rfp = rpcRfp;
2,440✔
407
  rpcInit.compressSize = tsCompressMsgSize;
2,440✔
408
  rpcInit.dfp = destroyAhandle;
2,440✔
409

410
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,440✔
411
  rpcInit.retryStepFactor = tsRedirectFactor;
2,440✔
412
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,440✔
413
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,440✔
414

415
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,440✔
416
  rpcInit.failFastThreshold = 3;    // failed threshold
2,440✔
417
  rpcInit.ffp = dmFailFastFp;
2,440✔
418

419
  rpcInit.noDelayFp = rpcNoDelayMsg;
2,440✔
420

421
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,440✔
422
  connLimitNum = TMAX(connLimitNum, 10);
2,440✔
423
  connLimitNum = TMIN(connLimitNum, 500);
2,440✔
424

425
  rpcInit.connLimitNum = connLimitNum;
2,440✔
426
  rpcInit.connLimitLock = 1;
2,440✔
427
  rpcInit.supportBatch = 1;
2,440✔
428
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,440✔
429
  rpcInit.shareConn = 1;
2,440✔
430
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,440✔
431
  rpcInit.notWaitAvaliableConn = 0;
2,440✔
432
  rpcInit.startReadTimer = 1;
2,440✔
433
  rpcInit.readTimeout = tsReadTimeout;
2,440✔
434

435
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,440!
436
    dError("failed to convert version string:%s to int", td_version);
×
437
  }
438

439
  pTrans->clientRpc = rpcOpen(&rpcInit);
2,440✔
440
  if (pTrans->clientRpc == NULL) {
2,440!
441
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
442
    return terrno;
×
443
  }
444

445
  dDebug("dnode rpc client is initialized");
2,440✔
446
  return 0;
2,440✔
447
}
448
int32_t dmInitStatusClient(SDnode *pDnode) {
2,440✔
449
  SDnodeTrans *pTrans = &pDnode->trans;
2,440✔
450

451
  SRpcInit rpcInit = {0};
2,440✔
452
  rpcInit.label = "DNODE-STA-CLI";
2,440✔
453
  rpcInit.numOfThreads = 1;
2,440✔
454
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,440✔
455
  rpcInit.sessions = 1024;
2,440✔
456
  rpcInit.connType = TAOS_CONN_CLIENT;
2,440✔
457
  rpcInit.user = TSDB_DEFAULT_USER;
2,440✔
458
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,440✔
459
  rpcInit.parent = pDnode;
2,440✔
460
  rpcInit.rfp = rpcRfp;
2,440✔
461
  rpcInit.compressSize = tsCompressMsgSize;
2,440✔
462

463
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,440✔
464
  rpcInit.retryStepFactor = tsRedirectFactor;
2,440✔
465
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,440✔
466
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,440✔
467

468
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,440✔
469
  rpcInit.failFastThreshold = 3;    // failed threshold
2,440✔
470
  rpcInit.ffp = dmFailFastFp;
2,440✔
471

472
  int32_t connLimitNum = 100;
2,440✔
473
  connLimitNum = TMAX(connLimitNum, 10);
2,440✔
474
  connLimitNum = TMIN(connLimitNum, 500);
2,440✔
475

476
  rpcInit.connLimitNum = connLimitNum;
2,440✔
477
  rpcInit.connLimitLock = 1;
2,440✔
478
  rpcInit.supportBatch = 1;
2,440✔
479
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,440✔
480
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,440✔
481
  rpcInit.startReadTimer = 0;
2,440✔
482
  rpcInit.readTimeout = 0;
2,440✔
483

484
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,440!
485
    dError("failed to convert version string:%s to int", td_version);
×
486
  }
487

488
  pTrans->statusRpc = rpcOpen(&rpcInit);
2,440✔
489
  if (pTrans->statusRpc == NULL) {
2,440!
490
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
491
    return terrno;
×
492
  }
493

494
  dDebug("dnode rpc status client is initialized");
2,440✔
495
  return 0;
2,440✔
496
}
497

498
int32_t dmInitSyncClient(SDnode *pDnode) {
2,440✔
499
  SDnodeTrans *pTrans = &pDnode->trans;
2,440✔
500

501
  SRpcInit rpcInit = {0};
2,440✔
502
  rpcInit.label = "DNODE-SYNC-CLI";
2,440✔
503
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,440✔
504
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,440✔
505
  rpcInit.sessions = 1024;
2,440✔
506
  rpcInit.connType = TAOS_CONN_CLIENT;
2,440✔
507
  rpcInit.user = TSDB_DEFAULT_USER;
2,440✔
508
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,440✔
509
  rpcInit.parent = pDnode;
2,440✔
510
  rpcInit.rfp = rpcRfp;
2,440✔
511
  rpcInit.compressSize = tsCompressMsgSize;
2,440✔
512

513
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,440✔
514
  rpcInit.retryStepFactor = tsRedirectFactor;
2,440✔
515
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,440✔
516
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,440✔
517

518
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,440✔
519
  rpcInit.failFastThreshold = 3;    // failed threshold
2,440✔
520
  rpcInit.ffp = dmFailFastFp;
2,440✔
521

522
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
2,440✔
523
  connLimitNum = TMAX(connLimitNum, 10);
2,440✔
524
  connLimitNum = TMIN(connLimitNum, 500);
2,440✔
525

526
  rpcInit.connLimitNum = connLimitNum;
2,440✔
527
  rpcInit.connLimitLock = 1;
2,440✔
528
  rpcInit.supportBatch = 1;
2,440✔
529
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
2,440✔
530
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,440✔
531
  rpcInit.startReadTimer = 1;
2,440✔
532
  rpcInit.readTimeout = tsReadTimeout;
2,440✔
533

534
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,440!
535
    dError("failed to convert version string:%s to int", td_version);
×
536
  }
537

538
  pTrans->syncRpc = rpcOpen(&rpcInit);
2,440✔
539
  if (pTrans->syncRpc == NULL) {
2,440!
540
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
541
    return terrno;
×
542
  }
543

544
  dDebug("dnode rpc sync client is initialized");
2,440✔
545
  return 0;
2,440✔
546
}
547

548
void dmCleanupClient(SDnode *pDnode) {
2,440✔
549
  SDnodeTrans *pTrans = &pDnode->trans;
2,440✔
550
  if (pTrans->clientRpc) {
2,440!
551
    rpcClose(pTrans->clientRpc);
2,440✔
552
    pTrans->clientRpc = NULL;
2,440✔
553
    dDebug("dnode rpc client is closed");
2,440✔
554
  }
555
}
2,440✔
556
void dmCleanupStatusClient(SDnode *pDnode) {
2,440✔
557
  SDnodeTrans *pTrans = &pDnode->trans;
2,440✔
558
  if (pTrans->statusRpc) {
2,440!
559
    rpcClose(pTrans->statusRpc);
2,440✔
560
    pTrans->statusRpc = NULL;
2,440✔
561
    dDebug("dnode rpc status client is closed");
2,440✔
562
  }
563
}
2,440✔
564
void dmCleanupSyncClient(SDnode *pDnode) {
2,440✔
565
  SDnodeTrans *pTrans = &pDnode->trans;
2,440✔
566
  if (pTrans->syncRpc) {
2,440!
567
    rpcClose(pTrans->syncRpc);
2,440✔
568
    pTrans->syncRpc = NULL;
2,440✔
569
    dDebug("dnode rpc sync client is closed");
2,440✔
570
  }
571
}
2,440✔
572

573
int32_t dmInitServer(SDnode *pDnode) {
2,440✔
574
  SDnodeTrans *pTrans = &pDnode->trans;
2,440✔
575

576
  SRpcInit rpcInit = {0};
2,440✔
577
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
2,440✔
578
  rpcInit.localPort = tsServerPort;
2,440✔
579
  rpcInit.label = "DND-S";
2,440✔
580
  rpcInit.numOfThreads = tsNumOfRpcThreads;
2,440✔
581
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,440✔
582
  rpcInit.sessions = tsMaxShellConns;
2,440✔
583
  rpcInit.connType = TAOS_CONN_SERVER;
2,440✔
584
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,440✔
585
  rpcInit.parent = pDnode;
2,440✔
586
  rpcInit.compressSize = tsCompressMsgSize;
2,440✔
587
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
2,440✔
588

589
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,440!
590
    dError("failed to convert version string:%s to int", td_version);
×
591
  }
592

593
  pTrans->serverRpc = rpcOpen(&rpcInit);
2,440✔
594
  if (pTrans->serverRpc == NULL) {
2,440!
595
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
×
596
    return terrno;
×
597
  }
598

599
  dDebug("dnode rpc server is initialized");
2,440✔
600
  return 0;
2,440✔
601
}
602

603
void dmCleanupServer(SDnode *pDnode) {
2,440✔
604
  SDnodeTrans *pTrans = &pDnode->trans;
2,440✔
605
  if (pTrans->serverRpc) {
2,440!
606
    rpcClose(pTrans->serverRpc);
2,440✔
607
    pTrans->serverRpc = NULL;
2,440✔
608
    dDebug("dnode rpc server is closed");
2,440✔
609
  }
610
}
2,440✔
611

612
SMsgCb dmGetMsgcb(SDnode *pDnode) {
20,332✔
613
  SMsgCb msgCb = {
20,332✔
614
      .clientRpc = pDnode->trans.clientRpc,
20,332✔
615
      .serverRpc = pDnode->trans.serverRpc,
20,332✔
616
      .statusRpc = pDnode->trans.statusRpc,
20,332✔
617
      .syncRpc = pDnode->trans.syncRpc,
20,332✔
618
      .sendReqFp = dmSendReq,
619
      .sendSyncReqFp = dmSendSyncReq,
620
      .sendRspFp = dmSendRsp,
621
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
622
      .releaseHandleFp = dmReleaseHandle,
623
      .reportStartupFp = dmReportStartup,
624
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
625
      .getDnodeEpFp = dmGetDnodeEp,
626
      .data = &pDnode->data,
20,332✔
627
  };
628
  return msgCb;
20,332✔
629
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc