• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.92
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
static inline void dmSendRsp(SRpcMsg *pMsg) {
12,905✔
23
  if (rpcSendResponse(pMsg) != 0) {
12,905!
UNCOV
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
26
}
12,911✔
27

28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
246✔
29
  SEpSet epSet = {0};
246✔
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
246✔
31

32
  if (epSet.numOfEps <= 1) {
249✔
33
    if (epSet.numOfEps == 0) {
123!
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
123!
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
249✔
48
  pMsg->pCont = rpcMallocCont(contLen);
243✔
49
  if (pMsg->pCont == NULL) {
243!
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
243✔
53
    if (contLen < 0) {
243!
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
57
    pMsg->contLen = contLen;
243✔
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
49,927,625✔
62
  const STraceId *trace = &pMsg->info.traceId;
49,927,625✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
49,927,625✔
65
  if (msgFp == NULL) {
49,927,625!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
49,927,625!
72
  pMsg->info.wrapper = pWrapper;
49,927,628✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
49,927,628✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
56,822✔
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
56,822✔
83
    return code;
16,040✔
84
  }
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
40,782✔
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
16,154✔
87
    code = TSDB_CODE_VND_STOPPED;
8,718✔
88
  }
89
  return code;
40,782✔
90
}
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
92
  int32_t        code = 0;
×
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
×
94
  code = tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
×
95
  if (code < 0) {
×
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
×
100
  pData->ipWhiteVer = ipWhite.ver;
×
101

102
  (void)tFreeSUpdateIpWhiteReq(&ipWhite);
×
103

104
  rpcFreeCont(pRpc->pCont);
×
105
}
106
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
54,200,075✔
107
  if (forbidden) {
54,200,075!
108
    SIpV4Range range = {.ip = clientIp, .mask = 32};
×
109
    char       buf[36] = {0};
×
110

111
    (void)rpcUtilSIpRangeToStr(&range, buf);
×
112
    dError("User:%s host:%s not in ip white list", user, buf);
×
113
    return true;
×
114
  } else {
115
    return false;
54,200,075✔
116
  }
117
}
118

119
static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
120
  SRetrieveAnalAlgoRsp rsp = {0};
×
121
  if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
122
    taosAnalUpdate(rsp.ver, rsp.hash);
×
123
    rsp.hash = NULL;
×
124
  }
125
  tFreeRetrieveAnalAlgoRsp(&rsp);
×
126
  rpcFreeCont(pRpc->pCont);
×
127
}
×
128

129
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
54,213,624✔
130
  SDnodeTrans  *pTrans = &pDnode->trans;
54,213,624✔
131
  int32_t       code = -1;
54,213,624✔
132
  SRpcMsg      *pMsg = NULL;
54,213,624✔
133
  SMgmtWrapper *pWrapper = NULL;
54,213,624✔
134
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
54,213,624✔
135

136
  const STraceId *trace = &pRpc->info.traceId;
54,213,624✔
137
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
54,213,624!
138
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
139

140
  int32_t svrVer = 0;
54,213,622✔
141
  code = taosVersionStrToInt(td_version, &svrVer);
54,213,622✔
142
  if (code != 0) {
54,218,854!
143
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
144
    goto _OVER;
×
145
  }
146
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
54,218,854!
147
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
×
148
           pRpc->info.conn.clientIp);
149
    goto _OVER;
×
150
  }
151

152
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
54,221,900✔
153
  if (isForbidden) {
54,216,401!
154
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
155
    goto _OVER;
×
156
  }
157

158
  switch (pRpc->msgType) {
54,216,401!
159
    case TDMT_DND_NET_TEST:
×
160
      dmProcessNetTestReq(pDnode, pRpc);
×
161
      return;
4,257,325✔
162
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
4,251,086✔
163
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
164
    case TDMT_SCH_FETCH_RSP:
165
    case TDMT_SCH_MERGE_FETCH_RSP:
166
    case TDMT_VND_SUBMIT_RSP:
167
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
4,251,086✔
168
      return;
4,257,325✔
169
    case TDMT_MND_STATUS_RSP:
×
170
      if (pEpSet != NULL) {
×
171
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
172
      }
173
      break;
×
174
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
175
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
×
176
      return;
×
177
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
×
178
      dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc);
×
179
      return;
×
180
    default:
49,965,315✔
181
      break;
49,965,315✔
182
  }
183

184
  /*
185
  pDnode is null, TD-22618
186
  at trans.c line 91
187
  before this line, dmProcessRpcMsg callback is set
188
  after this line, parent is set
189
  so when dmProcessRpcMsg is called, pDonde is still null.
190
  */
191
  if (pDnode != NULL) {
49,965,315✔
192
    if (pDnode->status != DND_STAT_RUNNING) {
49,946,601✔
193
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
43,541!
194
        dmProcessServerStartupStatus(pDnode, pRpc);
×
195
        return;
×
196
      } else {
197
        if (pDnode->status == DND_STAT_INIT) {
43,541✔
198
          code = TSDB_CODE_APP_IS_STARTING;
2,756✔
199
        } else {
200
          code = TSDB_CODE_APP_IS_STOPPING;
40,785✔
201
        }
202
        goto _OVER;
43,541✔
203
      }
204
    }
205
  } else {
206
    code = TSDB_CODE_APP_IS_STARTING;
18,714✔
207
    goto _OVER;
18,714✔
208
  }
209

210
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
49,903,060!
211
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
212
    code = TSDB_CODE_INVALID_MSG_LEN;
×
213
    goto _OVER;
×
214
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
49,903,060✔
215
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
4,609!
216
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
×
217
    code = pRpc->code;
×
218
    goto _OVER;
×
219
  }
220

221
  if (pHandle->defaultNtype == NODE_END) {
49,903,060!
222
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
223
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
224
    goto _OVER;
×
225
  }
226

227
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
49,903,060✔
228
  if (pHandle->needCheckVgId) {
49,903,060✔
229
    if (pRpc->contLen > 0) {
31,007,145!
230
      const SMsgHead *pHead = pRpc->pCont;
31,025,331✔
231
      const int32_t   vgId = ntohl(pHead->vgId);
31,025,331✔
232
      switch (vgId) {
31,025,331✔
233
        case QNODE_HANDLE:
1,177,396✔
234
          pWrapper = &pDnode->wrappers[QNODE];
1,177,396✔
235
          break;
1,177,396✔
236
        case SNODE_HANDLE:
10,495✔
237
          pWrapper = &pDnode->wrappers[SNODE];
10,495✔
238
          break;
10,495✔
239
        case MNODE_HANDLE:
93,739✔
240
          pWrapper = &pDnode->wrappers[MNODE];
93,739✔
241
          break;
93,739✔
242
        default:
29,743,701✔
243
          break;
29,743,701✔
244
      }
245
    } else {
246
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
247
      code = TSDB_CODE_INVALID_MSG_LEN;
×
248
      goto _OVER;
×
249
    }
250
  }
251

252
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
49,921,246✔
253
    pWrapper = NULL;
289✔
254
    goto _OVER;
289✔
255
  }
256

257
  pRpc->info.wrapper = pWrapper;
49,944,386✔
258

259
  EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
49,944,386✔
260
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
49,944,386✔
261
  if (code) goto _OVER;
49,968,237!
262

263
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
49,968,237✔
264
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
49,968,237!
265
          pRpc->contLen);
266

267
  code = dmProcessNodeMsg(pWrapper, pMsg);
49,968,241✔
268

269
_OVER:
50,052,637✔
270
  if (code != 0) {
50,052,637✔
271
    code = dmConvertErrCode(pRpc->msgType, code);
56,824✔
272
    if (pMsg) {
56,829✔
273
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
12,925!
274
    } else {
275
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
43,904!
276
    }
277

278
    if (IsReq(pRpc)) {
56,829✔
279
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
56,669✔
280
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
56,669✔
281
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
248✔
282
      }
283

284
      if (pWrapper != NULL) {
56,669✔
285
        dmSendRsp(&rsp);
12,911✔
286
      } else {
287
        if (rpcSendResponse(&rsp) != 0) {
43,758!
UNCOV
288
          dError("failed to send response, msg:%p", &rsp);
×
289
        }
290
      }
291
    }
292

293
    if (pMsg != NULL) {
56,826✔
294
      dGTrace("msg:%p, is freed", pMsg);
12,924!
295
      taosFreeQitem(pMsg);
12,924✔
296
    }
297
    rpcFreeCont(pRpc->pCont);
56,824✔
298
    pRpc->pCont = NULL;
56,834✔
299
  }
300

301
  dmReleaseWrapper(pWrapper);
50,052,647✔
302
}
303

304
int32_t dmInitMsgHandle(SDnode *pDnode) {
2,405✔
305
  SDnodeTrans *pTrans = &pDnode->trans;
2,405✔
306

307
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
14,430✔
308
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
12,025✔
309
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
12,025✔
310
    if (pArray == NULL) return -1;
12,025!
311

312
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
803,270✔
313
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
791,245✔
314
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
791,245✔
315
      if (pMgmt->needCheckVgId) {
791,245✔
316
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
139,490✔
317
      }
318
      if (!pMgmt->needCheckVgId) {
791,245✔
319
        pHandle->defaultNtype = ntype;
651,755✔
320
      }
321
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
791,245✔
322
    }
323

324
    taosArrayDestroy(pArray);
12,025✔
325
  }
326

327
  return 0;
2,405✔
328
}
329

330
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
525,125✔
331
  int32_t code = 0;
525,125✔
332
  SDnode *pDnode = dmInstance();
525,125✔
333
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
525,128✔
334
    rpcFreeCont(pMsg->pCont);
304✔
335
    pMsg->pCont = NULL;
304✔
336
    if (pDnode->status == DND_STAT_INIT) {
304!
337
      code = TSDB_CODE_APP_IS_STARTING;
×
338
    } else {
339
      code = TSDB_CODE_APP_IS_STOPPING;
304✔
340
    }
341
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
304!
342
           pMsg->info.handle);
343
    return code;
304✔
344
  } else {
345
    pMsg->info.handle = 0;
524,824✔
346
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
524,824✔
347
      dError("failed to send rpc msg");
8!
348
    }
349
    return 0;
524,875✔
350
  }
351
}
352
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
5,899,644✔
353
  int32_t code = 0;
5,899,644✔
354
  SDnode *pDnode = dmInstance();
5,899,644✔
355
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
5,899,650!
356
    rpcFreeCont(pMsg->pCont);
×
357
    pMsg->pCont = NULL;
×
358
    if (pDnode->status == DND_STAT_INIT) {
×
359
      code = TSDB_CODE_APP_IS_STARTING;
×
360
    } else {
361
      code = TSDB_CODE_APP_IS_STOPPING;
×
362
    }
363
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
364
           pMsg->info.handle);
365
    return code;
×
366
  } else {
367
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
5,899,650✔
368
  }
369
}
370

371
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
8,938,261✔
372

373
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { (void)rpcReleaseHandle(pHandle, type); }
8,938,739✔
374

375
static bool rpcRfp(int32_t code, tmsg_t msgType) {
51,069✔
376
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
51,069!
377
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
45,959✔
378
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
12,203✔
379
      code == TSDB_CODE_APP_IS_STOPPING) {
380
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
44,917!
381
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
44,917!
382
      return false;
×
383
    }
384
    return true;
44,917✔
385
  } else {
386
    return false;
6,152✔
387
  }
388
}
389
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
390
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
391
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
392
    return true;
×
393
  }
394
  return false;
×
395
}
396
int32_t dmInitClient(SDnode *pDnode) {
2,405✔
397
  SDnodeTrans *pTrans = &pDnode->trans;
2,405✔
398

399
  SRpcInit rpcInit = {0};
2,405✔
400
  rpcInit.label = "DNODE-CLI";
2,405✔
401
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,405✔
402
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,405✔
403
  rpcInit.sessions = 1024;
2,405✔
404
  rpcInit.connType = TAOS_CONN_CLIENT;
2,405✔
405
  rpcInit.user = TSDB_DEFAULT_USER;
2,405✔
406
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,405✔
407
  rpcInit.parent = pDnode;
2,405✔
408
  rpcInit.rfp = rpcRfp;
2,405✔
409
  rpcInit.compressSize = tsCompressMsgSize;
2,405✔
410
  rpcInit.dfp = destroyAhandle;
2,405✔
411

412
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,405✔
413
  rpcInit.retryStepFactor = tsRedirectFactor;
2,405✔
414
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,405✔
415
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,405✔
416

417
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,405✔
418
  rpcInit.failFastThreshold = 3;    // failed threshold
2,405✔
419
  rpcInit.ffp = dmFailFastFp;
2,405✔
420

421
  rpcInit.noDelayFp = rpcNoDelayMsg;
2,405✔
422

423
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,405✔
424
  connLimitNum = TMAX(connLimitNum, 10);
2,405✔
425
  connLimitNum = TMIN(connLimitNum, 500);
2,405✔
426

427
  rpcInit.connLimitNum = connLimitNum;
2,405✔
428
  rpcInit.connLimitLock = 1;
2,405✔
429
  rpcInit.supportBatch = 1;
2,405✔
430
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,405✔
431
  rpcInit.shareConn = 1;
2,405✔
432
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,405✔
433
  rpcInit.notWaitAvaliableConn = 0;
2,405✔
434
  rpcInit.startReadTimer = 1;
2,405✔
435
  rpcInit.readTimeout = tsReadTimeout;
2,405✔
436

437
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,405!
438
    dError("failed to convert version string:%s to int", td_version);
×
439
  }
440

441
  pTrans->clientRpc = rpcOpen(&rpcInit);
2,405✔
442
  if (pTrans->clientRpc == NULL) {
2,405!
443
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
444
    return terrno;
×
445
  }
446

447
  dDebug("dnode rpc client is initialized");
2,405✔
448
  return 0;
2,405✔
449
}
450
int32_t dmInitStatusClient(SDnode *pDnode) {
2,405✔
451
  SDnodeTrans *pTrans = &pDnode->trans;
2,405✔
452

453
  SRpcInit rpcInit = {0};
2,405✔
454
  rpcInit.label = "DNODE-STA-CLI";
2,405✔
455
  rpcInit.numOfThreads = 1;
2,405✔
456
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,405✔
457
  rpcInit.sessions = 1024;
2,405✔
458
  rpcInit.connType = TAOS_CONN_CLIENT;
2,405✔
459
  rpcInit.user = TSDB_DEFAULT_USER;
2,405✔
460
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,405✔
461
  rpcInit.parent = pDnode;
2,405✔
462
  rpcInit.rfp = rpcRfp;
2,405✔
463
  rpcInit.compressSize = tsCompressMsgSize;
2,405✔
464

465
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,405✔
466
  rpcInit.retryStepFactor = tsRedirectFactor;
2,405✔
467
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,405✔
468
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,405✔
469

470
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,405✔
471
  rpcInit.failFastThreshold = 3;    // failed threshold
2,405✔
472
  rpcInit.ffp = dmFailFastFp;
2,405✔
473

474
  int32_t connLimitNum = 100;
2,405✔
475
  connLimitNum = TMAX(connLimitNum, 10);
2,405✔
476
  connLimitNum = TMIN(connLimitNum, 500);
2,405✔
477

478
  rpcInit.connLimitNum = connLimitNum;
2,405✔
479
  rpcInit.connLimitLock = 1;
2,405✔
480
  rpcInit.supportBatch = 1;
2,405✔
481
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,405✔
482
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,405✔
483
  rpcInit.startReadTimer = 0;
2,405✔
484
  rpcInit.readTimeout = 0;
2,405✔
485

486
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,405!
487
    dError("failed to convert version string:%s to int", td_version);
×
488
  }
489

490
  pTrans->statusRpc = rpcOpen(&rpcInit);
2,405✔
491
  if (pTrans->statusRpc == NULL) {
2,405!
492
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
493
    return terrno;
×
494
  }
495

496
  dDebug("dnode rpc status client is initialized");
2,405✔
497
  return 0;
2,405✔
498
}
499

500
int32_t dmInitSyncClient(SDnode *pDnode) {
2,405✔
501
  SDnodeTrans *pTrans = &pDnode->trans;
2,405✔
502

503
  SRpcInit rpcInit = {0};
2,405✔
504
  rpcInit.label = "DNODE-SYNC-CLI";
2,405✔
505
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,405✔
506
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,405✔
507
  rpcInit.sessions = 1024;
2,405✔
508
  rpcInit.connType = TAOS_CONN_CLIENT;
2,405✔
509
  rpcInit.user = TSDB_DEFAULT_USER;
2,405✔
510
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,405✔
511
  rpcInit.parent = pDnode;
2,405✔
512
  rpcInit.rfp = rpcRfp;
2,405✔
513
  rpcInit.compressSize = tsCompressMsgSize;
2,405✔
514

515
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,405✔
516
  rpcInit.retryStepFactor = tsRedirectFactor;
2,405✔
517
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,405✔
518
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,405✔
519

520
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,405✔
521
  rpcInit.failFastThreshold = 3;    // failed threshold
2,405✔
522
  rpcInit.ffp = dmFailFastFp;
2,405✔
523

524
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
2,405✔
525
  connLimitNum = TMAX(connLimitNum, 10);
2,405✔
526
  connLimitNum = TMIN(connLimitNum, 500);
2,405✔
527

528
  rpcInit.connLimitNum = connLimitNum;
2,405✔
529
  rpcInit.connLimitLock = 1;
2,405✔
530
  rpcInit.supportBatch = 1;
2,405✔
531
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
2,405✔
532
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,405✔
533
  rpcInit.startReadTimer = 1;
2,405✔
534
  rpcInit.readTimeout = tsReadTimeout;
2,405✔
535

536
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,405!
537
    dError("failed to convert version string:%s to int", td_version);
×
538
  }
539

540
  pTrans->syncRpc = rpcOpen(&rpcInit);
2,405✔
541
  if (pTrans->syncRpc == NULL) {
2,405!
542
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
543
    return terrno;
×
544
  }
545

546
  dDebug("dnode rpc sync client is initialized");
2,405✔
547
  return 0;
2,405✔
548
}
549

550
void dmCleanupClient(SDnode *pDnode) {
2,405✔
551
  SDnodeTrans *pTrans = &pDnode->trans;
2,405✔
552
  if (pTrans->clientRpc) {
2,405!
553
    rpcClose(pTrans->clientRpc);
2,405✔
554
    pTrans->clientRpc = NULL;
2,405✔
555
    dDebug("dnode rpc client is closed");
2,405✔
556
  }
557
}
2,405✔
558
void dmCleanupStatusClient(SDnode *pDnode) {
2,405✔
559
  SDnodeTrans *pTrans = &pDnode->trans;
2,405✔
560
  if (pTrans->statusRpc) {
2,405!
561
    rpcClose(pTrans->statusRpc);
2,405✔
562
    pTrans->statusRpc = NULL;
2,405✔
563
    dDebug("dnode rpc status client is closed");
2,405✔
564
  }
565
}
2,405✔
566
void dmCleanupSyncClient(SDnode *pDnode) {
2,405✔
567
  SDnodeTrans *pTrans = &pDnode->trans;
2,405✔
568
  if (pTrans->syncRpc) {
2,405!
569
    rpcClose(pTrans->syncRpc);
2,405✔
570
    pTrans->syncRpc = NULL;
2,405✔
571
    dDebug("dnode rpc sync client is closed");
2,405✔
572
  }
573
}
2,405✔
574

575
int32_t dmInitServer(SDnode *pDnode) {
2,405✔
576
  SDnodeTrans *pTrans = &pDnode->trans;
2,405✔
577

578
  SRpcInit rpcInit = {0};
2,405✔
579
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
2,405✔
580
  rpcInit.localPort = tsServerPort;
2,405✔
581
  rpcInit.label = "DND-S";
2,405✔
582
  rpcInit.numOfThreads = tsNumOfRpcThreads;
2,405✔
583
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,405✔
584
  rpcInit.sessions = tsMaxShellConns;
2,405✔
585
  rpcInit.connType = TAOS_CONN_SERVER;
2,405✔
586
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,405✔
587
  rpcInit.parent = pDnode;
2,405✔
588
  rpcInit.compressSize = tsCompressMsgSize;
2,405✔
589
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
2,405✔
590

591
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,405!
592
    dError("failed to convert version string:%s to int", td_version);
×
593
  }
594

595
  pTrans->serverRpc = rpcOpen(&rpcInit);
2,405✔
596
  if (pTrans->serverRpc == NULL) {
2,405!
UNCOV
597
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
×
UNCOV
598
    return terrno;
×
599
  }
600

601
  dDebug("dnode rpc server is initialized");
2,405✔
602
  return 0;
2,405✔
603
}
604

605
void dmCleanupServer(SDnode *pDnode) {
2,405✔
606
  SDnodeTrans *pTrans = &pDnode->trans;
2,405✔
607
  if (pTrans->serverRpc) {
2,405!
608
    rpcClose(pTrans->serverRpc);
2,405✔
609
    pTrans->serverRpc = NULL;
2,405✔
610
    dDebug("dnode rpc server is closed");
2,405✔
611
  }
612
}
2,405✔
613

614
SMsgCb dmGetMsgcb(SDnode *pDnode) {
20,038✔
615
  SMsgCb msgCb = {
20,038✔
616
      .clientRpc = pDnode->trans.clientRpc,
20,038✔
617
      .serverRpc = pDnode->trans.serverRpc,
20,038✔
618
      .statusRpc = pDnode->trans.statusRpc,
20,038✔
619
      .syncRpc = pDnode->trans.syncRpc,
20,038✔
620
      .sendReqFp = dmSendReq,
621
      .sendSyncReqFp = dmSendSyncReq,
622
      .sendRspFp = dmSendRsp,
623
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
624
      .releaseHandleFp = dmReleaseHandle,
625
      .reportStartupFp = dmReportStartup,
626
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
627
      .getDnodeEpFp = dmGetDnodeEp,
628
      .data = &pDnode->data,
20,038✔
629
  };
630
  return msgCb;
20,038✔
631
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc