• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

UNCOV
22
static inline void dmSendRsp(SRpcMsg *pMsg) {
×
UNCOV
23
  if (rpcSendResponse(pMsg) != 0) {
×
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
UNCOV
26
}
×
27

UNCOV
28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
×
UNCOV
29
  SEpSet epSet = {0};
×
UNCOV
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
×
31

UNCOV
32
  if (epSet.numOfEps <= 1) {
×
UNCOV
33
    if (epSet.numOfEps == 0) {
×
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
UNCOV
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
×
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

UNCOV
47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
×
UNCOV
48
  pMsg->pCont = rpcMallocCont(contLen);
×
UNCOV
49
  if (pMsg->pCont == NULL) {
×
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
UNCOV
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
×
UNCOV
53
    if (contLen < 0) {
×
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
UNCOV
57
    pMsg->contLen = contLen;
×
58
  }
59
}
60

UNCOV
61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
×
UNCOV
62
  const STraceId *trace = &pMsg->info.traceId;
×
63

UNCOV
64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
×
UNCOV
65
  if (msgFp == NULL) {
×
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

UNCOV
71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
×
UNCOV
72
  pMsg->info.wrapper = pWrapper;
×
UNCOV
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
×
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

UNCOV
81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
×
UNCOV
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
×
UNCOV
83
    return code;
×
84
  }
UNCOV
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
×
UNCOV
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
×
UNCOV
87
    code = TSDB_CODE_VND_STOPPED;
×
88
  }
UNCOV
89
  return code;
×
90
}
UNCOV
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
UNCOV
92
  int32_t        code = 0;
×
UNCOV
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
×
UNCOV
94
  code = tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
×
UNCOV
95
  if (code < 0) {
×
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
UNCOV
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
×
UNCOV
100
  pData->ipWhiteVer = ipWhite.ver;
×
101

UNCOV
102
  (void)tFreeSUpdateIpWhiteReq(&ipWhite);
×
103

UNCOV
104
  rpcFreeCont(pRpc->pCont);
×
105
}
UNCOV
106
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
×
UNCOV
107
  if (forbidden) {
×
108
    SIpV4Range range = {.ip = clientIp, .mask = 32};
×
109
    char       buf[36] = {0};
×
110

111
    (void)rpcUtilSIpRangeToStr(&range, buf);
×
112
    dError("User:%s host:%s not in ip white list", user, buf);
×
113
    return true;
×
114
  } else {
UNCOV
115
    return false;
×
116
  }
117
}
118

UNCOV
119
static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
UNCOV
120
  SRetrieveAnalAlgoRsp rsp = {0};
×
UNCOV
121
  if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
UNCOV
122
    taosAnalUpdate(rsp.ver, rsp.hash);
×
UNCOV
123
    rsp.hash = NULL;
×
124
  }
UNCOV
125
  tFreeRetrieveAnalAlgoRsp(&rsp);
×
UNCOV
126
  rpcFreeCont(pRpc->pCont);
×
UNCOV
127
}
×
128

UNCOV
129
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
×
UNCOV
130
  SDnodeTrans  *pTrans = &pDnode->trans;
×
UNCOV
131
  int32_t       code = -1;
×
UNCOV
132
  SRpcMsg      *pMsg = NULL;
×
UNCOV
133
  SMgmtWrapper *pWrapper = NULL;
×
UNCOV
134
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
×
135

UNCOV
136
  const STraceId *trace = &pRpc->info.traceId;
×
UNCOV
137
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
×
138
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
139

UNCOV
140
  int32_t svrVer = 0;
×
UNCOV
141
  code = taosVersionStrToInt(td_version, &svrVer);
×
UNCOV
142
  if (code != 0) {
×
143
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
144
    goto _OVER;
×
145
  }
UNCOV
146
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
×
147
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
×
148
           pRpc->info.conn.clientIp);
149
    goto _OVER;
×
150
  }
151

UNCOV
152
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
×
UNCOV
153
  if (isForbidden) {
×
154
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
155
    goto _OVER;
×
156
  }
157

UNCOV
158
  switch (pRpc->msgType) {
×
159
    case TDMT_DND_NET_TEST:
×
160
      dmProcessNetTestReq(pDnode, pRpc);
×
UNCOV
161
      return;
×
UNCOV
162
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
×
163
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
164
    case TDMT_SCH_FETCH_RSP:
165
    case TDMT_SCH_MERGE_FETCH_RSP:
166
    case TDMT_VND_SUBMIT_RSP:
UNCOV
167
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
×
UNCOV
168
      return;
×
169
    case TDMT_MND_STATUS_RSP:
×
170
      if (pEpSet != NULL) {
×
171
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
172
      }
173
      break;
×
UNCOV
174
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
UNCOV
175
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
×
UNCOV
176
      return;
×
UNCOV
177
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
×
UNCOV
178
      dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc);
×
UNCOV
179
      return;
×
UNCOV
180
    default:
×
UNCOV
181
      break;
×
182
  }
183

184
  /*
185
  pDnode is null, TD-22618
186
  at trans.c line 91
187
  before this line, dmProcessRpcMsg callback is set
188
  after this line, parent is set
189
  so when dmProcessRpcMsg is called, pDonde is still null.
190
  */
UNCOV
191
  if (pDnode != NULL) {
×
UNCOV
192
    if (pDnode->status != DND_STAT_RUNNING) {
×
UNCOV
193
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
×
194
        dmProcessServerStartupStatus(pDnode, pRpc);
×
195
        return;
×
196
      } else {
UNCOV
197
        if (pDnode->status == DND_STAT_INIT) {
×
UNCOV
198
          code = TSDB_CODE_APP_IS_STARTING;
×
199
        } else {
UNCOV
200
          code = TSDB_CODE_APP_IS_STOPPING;
×
201
        }
UNCOV
202
        goto _OVER;
×
203
      }
204
    }
205
  } else {
206
    code = TSDB_CODE_APP_IS_STARTING;
×
207
    goto _OVER;
×
208
  }
209

UNCOV
210
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
×
211
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
212
    code = TSDB_CODE_INVALID_MSG_LEN;
×
213
    goto _OVER;
×
UNCOV
214
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
×
UNCOV
215
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
×
UNCOV
216
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
×
217
  }
218

UNCOV
219
  if (pHandle->defaultNtype == NODE_END) {
×
220
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
221
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
222
    goto _OVER;
×
223
  }
224

UNCOV
225
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
×
UNCOV
226
  if (pHandle->needCheckVgId) {
×
UNCOV
227
    if (pRpc->contLen > 0) {
×
UNCOV
228
      const SMsgHead *pHead = pRpc->pCont;
×
UNCOV
229
      const int32_t   vgId = ntohl(pHead->vgId);
×
UNCOV
230
      switch (vgId) {
×
UNCOV
231
        case QNODE_HANDLE:
×
UNCOV
232
          pWrapper = &pDnode->wrappers[QNODE];
×
UNCOV
233
          break;
×
UNCOV
234
        case SNODE_HANDLE:
×
UNCOV
235
          pWrapper = &pDnode->wrappers[SNODE];
×
UNCOV
236
          break;
×
UNCOV
237
        case MNODE_HANDLE:
×
UNCOV
238
          pWrapper = &pDnode->wrappers[MNODE];
×
UNCOV
239
          break;
×
UNCOV
240
        default:
×
UNCOV
241
          break;
×
242
      }
243
    } else {
244
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
245
      code = TSDB_CODE_INVALID_MSG_LEN;
×
246
      goto _OVER;
×
247
    }
248
  }
249

UNCOV
250
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
×
UNCOV
251
    pWrapper = NULL;
×
UNCOV
252
    goto _OVER;
×
253
  }
254

UNCOV
255
  pRpc->info.wrapper = pWrapper;
×
256

UNCOV
257
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
×
UNCOV
258
  if (IsReq(pRpc) && pRpc->msgType != TDMT_SYNC_HEARTBEAT && pRpc->msgType != TDMT_SYNC_HEARTBEAT_REPLY)
×
UNCOV
259
    itype = RPC_QITEM;
×
UNCOV
260
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
×
UNCOV
261
  if (code) goto _OVER;
×
262

UNCOV
263
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
×
UNCOV
264
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
×
265
          pRpc->contLen);
266

UNCOV
267
  code = dmProcessNodeMsg(pWrapper, pMsg);
×
268

UNCOV
269
_OVER:
×
UNCOV
270
  if (code != 0) {
×
UNCOV
271
    code = dmConvertErrCode(pRpc->msgType, code);
×
UNCOV
272
    if (pMsg) {
×
UNCOV
273
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
×
274
    } else {
UNCOV
275
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
×
276
    }
277

UNCOV
278
    if (IsReq(pRpc)) {
×
UNCOV
279
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
×
UNCOV
280
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
×
UNCOV
281
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
×
282
      }
283

UNCOV
284
      if (pWrapper != NULL) {
×
UNCOV
285
        dmSendRsp(&rsp);
×
286
      } else {
UNCOV
287
        if (rpcSendResponse(&rsp) != 0) {
×
288
          dError("failed to send response, msg:%p", &rsp);
×
289
        }
290
      }
291
    }
292

UNCOV
293
    if (pMsg != NULL) {
×
UNCOV
294
      dGTrace("msg:%p, is freed", pMsg);
×
UNCOV
295
      taosFreeQitem(pMsg);
×
296
    }
UNCOV
297
    rpcFreeCont(pRpc->pCont);
×
UNCOV
298
    pRpc->pCont = NULL;
×
299
  }
300

UNCOV
301
  dmReleaseWrapper(pWrapper);
×
302
}
303

UNCOV
304
int32_t dmInitMsgHandle(SDnode *pDnode) {
×
UNCOV
305
  SDnodeTrans *pTrans = &pDnode->trans;
×
306

UNCOV
307
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
×
UNCOV
308
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
×
UNCOV
309
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
×
UNCOV
310
    if (pArray == NULL) return -1;
×
311

UNCOV
312
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
UNCOV
313
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
×
UNCOV
314
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
×
UNCOV
315
      if (pMgmt->needCheckVgId) {
×
UNCOV
316
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
×
317
      }
UNCOV
318
      if (!pMgmt->needCheckVgId) {
×
UNCOV
319
        pHandle->defaultNtype = ntype;
×
320
      }
UNCOV
321
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
×
322
    }
323

UNCOV
324
    taosArrayDestroy(pArray);
×
325
  }
326

UNCOV
327
  return 0;
×
328
}
329

UNCOV
330
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
×
UNCOV
331
  int32_t code = 0;
×
UNCOV
332
  SDnode *pDnode = dmInstance();
×
UNCOV
333
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
×
UNCOV
334
    rpcFreeCont(pMsg->pCont);
×
UNCOV
335
    pMsg->pCont = NULL;
×
UNCOV
336
    if (pDnode->status == DND_STAT_INIT) {
×
337
      code = TSDB_CODE_APP_IS_STARTING;
×
338
    } else {
UNCOV
339
      code = TSDB_CODE_APP_IS_STOPPING;
×
340
    }
UNCOV
341
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
342
           pMsg->info.handle);
UNCOV
343
    return code;
×
344
  } else {
UNCOV
345
    pMsg->info.handle = 0;
×
UNCOV
346
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
×
UNCOV
347
      dError("failed to send rpc msg");
×
348
    }
UNCOV
349
    return 0;
×
350
  }
351
}
UNCOV
352
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
×
UNCOV
353
  int32_t code = 0;
×
UNCOV
354
  SDnode *pDnode = dmInstance();
×
UNCOV
355
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
×
356
    rpcFreeCont(pMsg->pCont);
×
357
    pMsg->pCont = NULL;
×
358
    if (pDnode->status == DND_STAT_INIT) {
×
359
      code = TSDB_CODE_APP_IS_STARTING;
×
360
    } else {
361
      code = TSDB_CODE_APP_IS_STOPPING;
×
362
    }
363
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
364
           pMsg->info.handle);
365
    return code;
×
366
  } else {
UNCOV
367
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
×
368
  }
369
}
370

UNCOV
371
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
×
372

UNCOV
373
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
×
UNCOV
374
  (void)rpcReleaseHandle(pHandle, type, status);
×
UNCOV
375
}
×
376

UNCOV
377
static bool rpcRfp(int32_t code, tmsg_t msgType) {
×
UNCOV
378
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
×
UNCOV
379
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
×
UNCOV
380
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
×
381
      code == TSDB_CODE_APP_IS_STOPPING) {
UNCOV
382
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
×
UNCOV
383
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
UNCOV
384
      return false;
×
385
    }
UNCOV
386
    return true;
×
387
  } else {
UNCOV
388
    return false;
×
389
  }
390
}
391
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
392
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
393
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
394
    return true;
×
395
  }
396
  return false;
×
397
}
UNCOV
398
int32_t dmInitClient(SDnode *pDnode) {
×
UNCOV
399
  SDnodeTrans *pTrans = &pDnode->trans;
×
400

UNCOV
401
  SRpcInit rpcInit = {0};
×
UNCOV
402
  rpcInit.label = "DNODE-CLI";
×
UNCOV
403
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
×
UNCOV
404
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
×
UNCOV
405
  rpcInit.sessions = 1024;
×
UNCOV
406
  rpcInit.connType = TAOS_CONN_CLIENT;
×
UNCOV
407
  rpcInit.user = TSDB_DEFAULT_USER;
×
UNCOV
408
  rpcInit.idleTime = tsShellActivityTimer * 1000;
×
UNCOV
409
  rpcInit.parent = pDnode;
×
UNCOV
410
  rpcInit.rfp = rpcRfp;
×
UNCOV
411
  rpcInit.compressSize = tsCompressMsgSize;
×
UNCOV
412
  rpcInit.dfp = destroyAhandle;
×
413

UNCOV
414
  rpcInit.retryMinInterval = tsRedirectPeriod;
×
UNCOV
415
  rpcInit.retryStepFactor = tsRedirectFactor;
×
UNCOV
416
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
×
UNCOV
417
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
×
418

UNCOV
419
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
×
UNCOV
420
  rpcInit.failFastThreshold = 3;    // failed threshold
×
UNCOV
421
  rpcInit.ffp = dmFailFastFp;
×
422

UNCOV
423
  rpcInit.noDelayFp = rpcNoDelayMsg;
×
424

UNCOV
425
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
×
UNCOV
426
  connLimitNum = TMAX(connLimitNum, 10);
×
UNCOV
427
  connLimitNum = TMIN(connLimitNum, 500);
×
428

UNCOV
429
  rpcInit.connLimitNum = connLimitNum;
×
UNCOV
430
  rpcInit.connLimitLock = 1;
×
UNCOV
431
  rpcInit.supportBatch = 1;
×
UNCOV
432
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
×
UNCOV
433
  rpcInit.shareConn = 1;
×
UNCOV
434
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
×
UNCOV
435
  rpcInit.notWaitAvaliableConn = 0;
×
UNCOV
436
  rpcInit.startReadTimer = 1;
×
UNCOV
437
  rpcInit.readTimeout = tsReadTimeout;
×
438

UNCOV
439
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
×
440
    dError("failed to convert version string:%s to int", td_version);
×
441
  }
442

UNCOV
443
  pTrans->clientRpc = rpcOpen(&rpcInit);
×
UNCOV
444
  if (pTrans->clientRpc == NULL) {
×
445
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
446
    return terrno;
×
447
  }
448

UNCOV
449
  dDebug("dnode rpc client is initialized");
×
UNCOV
450
  return 0;
×
451
}
UNCOV
452
int32_t dmInitStatusClient(SDnode *pDnode) {
×
UNCOV
453
  SDnodeTrans *pTrans = &pDnode->trans;
×
454

UNCOV
455
  SRpcInit rpcInit = {0};
×
UNCOV
456
  rpcInit.label = "DNODE-STA-CLI";
×
UNCOV
457
  rpcInit.numOfThreads = 1;
×
UNCOV
458
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
×
UNCOV
459
  rpcInit.sessions = 1024;
×
UNCOV
460
  rpcInit.connType = TAOS_CONN_CLIENT;
×
UNCOV
461
  rpcInit.user = TSDB_DEFAULT_USER;
×
UNCOV
462
  rpcInit.idleTime = tsShellActivityTimer * 1000;
×
UNCOV
463
  rpcInit.parent = pDnode;
×
UNCOV
464
  rpcInit.rfp = rpcRfp;
×
UNCOV
465
  rpcInit.compressSize = tsCompressMsgSize;
×
466

UNCOV
467
  rpcInit.retryMinInterval = tsRedirectPeriod;
×
UNCOV
468
  rpcInit.retryStepFactor = tsRedirectFactor;
×
UNCOV
469
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
×
UNCOV
470
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
×
471

UNCOV
472
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
×
UNCOV
473
  rpcInit.failFastThreshold = 3;    // failed threshold
×
UNCOV
474
  rpcInit.ffp = dmFailFastFp;
×
475

UNCOV
476
  int32_t connLimitNum = 100;
×
UNCOV
477
  connLimitNum = TMAX(connLimitNum, 10);
×
UNCOV
478
  connLimitNum = TMIN(connLimitNum, 500);
×
479

UNCOV
480
  rpcInit.connLimitNum = connLimitNum;
×
UNCOV
481
  rpcInit.connLimitLock = 1;
×
UNCOV
482
  rpcInit.supportBatch = 1;
×
UNCOV
483
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
×
UNCOV
484
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
×
UNCOV
485
  rpcInit.startReadTimer = 0;
×
UNCOV
486
  rpcInit.readTimeout = 0;
×
487

UNCOV
488
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
×
489
    dError("failed to convert version string:%s to int", td_version);
×
490
  }
491

UNCOV
492
  pTrans->statusRpc = rpcOpen(&rpcInit);
×
UNCOV
493
  if (pTrans->statusRpc == NULL) {
×
494
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
495
    return terrno;
×
496
  }
497

UNCOV
498
  dDebug("dnode rpc status client is initialized");
×
UNCOV
499
  return 0;
×
500
}
501

UNCOV
502
int32_t dmInitSyncClient(SDnode *pDnode) {
×
UNCOV
503
  SDnodeTrans *pTrans = &pDnode->trans;
×
504

UNCOV
505
  SRpcInit rpcInit = {0};
×
UNCOV
506
  rpcInit.label = "DNODE-SYNC-CLI";
×
UNCOV
507
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
×
UNCOV
508
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
×
UNCOV
509
  rpcInit.sessions = 1024;
×
UNCOV
510
  rpcInit.connType = TAOS_CONN_CLIENT;
×
UNCOV
511
  rpcInit.user = TSDB_DEFAULT_USER;
×
UNCOV
512
  rpcInit.idleTime = tsShellActivityTimer * 1000;
×
UNCOV
513
  rpcInit.parent = pDnode;
×
UNCOV
514
  rpcInit.rfp = rpcRfp;
×
UNCOV
515
  rpcInit.compressSize = tsCompressMsgSize;
×
516

UNCOV
517
  rpcInit.retryMinInterval = tsRedirectPeriod;
×
UNCOV
518
  rpcInit.retryStepFactor = tsRedirectFactor;
×
UNCOV
519
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
×
UNCOV
520
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
×
521

UNCOV
522
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
×
UNCOV
523
  rpcInit.failFastThreshold = 3;    // failed threshold
×
UNCOV
524
  rpcInit.ffp = dmFailFastFp;
×
525

UNCOV
526
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
×
UNCOV
527
  connLimitNum = TMAX(connLimitNum, 10);
×
UNCOV
528
  connLimitNum = TMIN(connLimitNum, 500);
×
529

UNCOV
530
  rpcInit.connLimitNum = connLimitNum;
×
UNCOV
531
  rpcInit.connLimitLock = 1;
×
UNCOV
532
  rpcInit.supportBatch = 1;
×
UNCOV
533
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
×
UNCOV
534
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
×
UNCOV
535
  rpcInit.startReadTimer = 1;
×
UNCOV
536
  rpcInit.readTimeout = tsReadTimeout;
×
537

UNCOV
538
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
×
539
    dError("failed to convert version string:%s to int", td_version);
×
540
  }
541

UNCOV
542
  pTrans->syncRpc = rpcOpen(&rpcInit);
×
UNCOV
543
  if (pTrans->syncRpc == NULL) {
×
544
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
545
    return terrno;
×
546
  }
547

UNCOV
548
  dDebug("dnode rpc sync client is initialized");
×
UNCOV
549
  return 0;
×
550
}
551

UNCOV
552
void dmCleanupClient(SDnode *pDnode) {
×
UNCOV
553
  SDnodeTrans *pTrans = &pDnode->trans;
×
UNCOV
554
  if (pTrans->clientRpc) {
×
UNCOV
555
    rpcClose(pTrans->clientRpc);
×
UNCOV
556
    pTrans->clientRpc = NULL;
×
UNCOV
557
    dDebug("dnode rpc client is closed");
×
558
  }
UNCOV
559
}
×
UNCOV
560
void dmCleanupStatusClient(SDnode *pDnode) {
×
UNCOV
561
  SDnodeTrans *pTrans = &pDnode->trans;
×
UNCOV
562
  if (pTrans->statusRpc) {
×
UNCOV
563
    rpcClose(pTrans->statusRpc);
×
UNCOV
564
    pTrans->statusRpc = NULL;
×
UNCOV
565
    dDebug("dnode rpc status client is closed");
×
566
  }
UNCOV
567
}
×
UNCOV
568
void dmCleanupSyncClient(SDnode *pDnode) {
×
UNCOV
569
  SDnodeTrans *pTrans = &pDnode->trans;
×
UNCOV
570
  if (pTrans->syncRpc) {
×
UNCOV
571
    rpcClose(pTrans->syncRpc);
×
UNCOV
572
    pTrans->syncRpc = NULL;
×
UNCOV
573
    dDebug("dnode rpc sync client is closed");
×
574
  }
UNCOV
575
}
×
576

UNCOV
577
int32_t dmInitServer(SDnode *pDnode) {
×
UNCOV
578
  SDnodeTrans *pTrans = &pDnode->trans;
×
579

UNCOV
580
  SRpcInit rpcInit = {0};
×
UNCOV
581
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
×
UNCOV
582
  rpcInit.localPort = tsServerPort;
×
UNCOV
583
  rpcInit.label = "DND-S";
×
UNCOV
584
  rpcInit.numOfThreads = tsNumOfRpcThreads;
×
UNCOV
585
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
×
UNCOV
586
  rpcInit.sessions = tsMaxShellConns;
×
UNCOV
587
  rpcInit.connType = TAOS_CONN_SERVER;
×
UNCOV
588
  rpcInit.idleTime = tsShellActivityTimer * 1000;
×
UNCOV
589
  rpcInit.parent = pDnode;
×
UNCOV
590
  rpcInit.compressSize = tsCompressMsgSize;
×
UNCOV
591
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
×
592

UNCOV
593
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
×
594
    dError("failed to convert version string:%s to int", td_version);
×
595
  }
596

UNCOV
597
  pTrans->serverRpc = rpcOpen(&rpcInit);
×
UNCOV
598
  if (pTrans->serverRpc == NULL) {
×
UNCOV
599
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
×
UNCOV
600
    return terrno;
×
601
  }
602

UNCOV
603
  dDebug("dnode rpc server is initialized");
×
UNCOV
604
  return 0;
×
605
}
606

UNCOV
607
void dmCleanupServer(SDnode *pDnode) {
×
UNCOV
608
  SDnodeTrans *pTrans = &pDnode->trans;
×
UNCOV
609
  if (pTrans->serverRpc) {
×
UNCOV
610
    rpcClose(pTrans->serverRpc);
×
UNCOV
611
    pTrans->serverRpc = NULL;
×
UNCOV
612
    dDebug("dnode rpc server is closed");
×
613
  }
UNCOV
614
}
×
615

UNCOV
616
SMsgCb dmGetMsgcb(SDnode *pDnode) {
×
UNCOV
617
  SMsgCb msgCb = {
×
UNCOV
618
      .clientRpc = pDnode->trans.clientRpc,
×
UNCOV
619
      .serverRpc = pDnode->trans.serverRpc,
×
UNCOV
620
      .statusRpc = pDnode->trans.statusRpc,
×
UNCOV
621
      .syncRpc = pDnode->trans.syncRpc,
×
622
      .sendReqFp = dmSendReq,
623
      .sendSyncReqFp = dmSendSyncReq,
624
      .sendRspFp = dmSendRsp,
625
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
626
      .releaseHandleFp = dmReleaseHandle,
627
      .reportStartupFp = dmReportStartup,
628
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
629
      .getDnodeEpFp = dmGetDnodeEp,
UNCOV
630
      .data = &pDnode->data,
×
631
  };
UNCOV
632
  return msgCb;
×
633
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc