• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4783

09 Oct 2025 07:30AM UTC coverage: 58.457% (+0.2%) from 58.252%
#4783

push

travis-ci

web-flow
Merge pull request #33183 from taosdata/fix/sort-release-note

fix: replace DocCardList with SortedDocCardList in release notes

138849 of 302745 branches covered (45.86%)

Branch coverage included in aggregate %.

210224 of 294403 relevant lines covered (71.41%)

16945440.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.11
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
#define IS_STREAM_TRIGGER_RSP_MSG(_msg) (TDMT_STREAM_TRIGGER_CALC_RSP == (_msg) || TDMT_STREAM_TRIGGER_PULL_RSP == (_msg) || TDMT_STREAM_TRIGGER_DROP_RSP == (_msg))
23

24
static inline void dmSendRsp(SRpcMsg *pMsg) {
12,945✔
25
  if (rpcSendResponse(pMsg) != 0) {
12,945!
26
    dError("failed to send response, msg:%p", pMsg);
×
27
  }
28
}
12,951✔
29

30
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
301✔
31
  SEpSet epSet = {0};
301✔
32
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
301✔
33

34
  if (epSet.numOfEps <= 1) {
297✔
35
    if (epSet.numOfEps == 0) {
148!
36
      pMsg->pCont = NULL;
×
37
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
38
      return;
×
39
    }
40
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
41
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
42
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
148!
43
      pMsg->pCont = NULL;
×
44
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
45
      return;
×
46
    }
47
  }
48

49
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
297✔
50
  pMsg->pCont = rpcMallocCont(contLen);
295✔
51
  if (pMsg->pCont == NULL) {
293!
52
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
53
  } else {
54
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
293✔
55
    if (contLen < 0) {
298!
56
      pMsg->code = contLen;
×
57
      return;
×
58
    }
59
    pMsg->contLen = contLen;
298✔
60
  }
61
}
62

63
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
45,833,142✔
64
  const STraceId *trace = &pMsg->info.traceId;
45,833,142✔
65

66
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
45,833,142✔
67
  if (msgFp == NULL) {
45,833,142!
68
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
69
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
70
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
71
  }
72

73
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
45,833,142!
74
  pMsg->info.wrapper = pWrapper;
45,833,143✔
75
  return (*msgFp)(pWrapper->pMgmt, pMsg);
45,833,143✔
76
}
77

78
static bool dmFailFastFp(tmsg_t msgType) {
×
79
  // add more msg type later
80
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
81
}
82

83
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
56,796✔
84
  if (code != TSDB_CODE_APP_IS_STOPPING) {
56,796✔
85
    return code;
36,862✔
86
  }
87
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
19,934✔
88
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
8,043✔
89
    code = TSDB_CODE_VND_STOPPED;
5,169✔
90
  }
91
  return code;
19,934✔
92
}
93
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
7✔
94
  int32_t        code = 0;
7✔
95
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
7✔
96
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
7✔
97
  if (code < 0) {
7!
98
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
99
    return;
×
100
  }
101
  code = rpcSetIpWhite(pTrans, &ipWhite);
7✔
102
  pData->ipWhiteVer = ipWhite.ver;
7✔
103

104
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
7✔
105

106
  rpcFreeCont(pRpc->pCont);
7✔
107
}
108

109
static void dmUpdateRpcIpWhiteUnused(SDnodeData *pDnode, void *pTrans, SRpcMsg *pRpc) {
×
110
  int32_t code = TSDB_CODE_INVALID_MSG;
×
111
  dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
112
  rpcFreeCont(pRpc->pCont);
×
113
  pRpc->pCont = NULL;
×
114
  return;
×
115
}
116
static bool dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
50,795,022✔
117
  if (forbidden) {
50,795,022!
118
    dError("User:%s host:%s not in ip white list", user, IP_ADDR_STR(clientIp));
×
119
    return true;
×
120
  } else {
121
    return false;
50,795,022✔
122
  }
123
}
124

125
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
126
  SRetrieveAnalyticAlgoRsp rsp = {0};
×
127
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
128
    taosAnalyUpdate(rsp.ver, rsp.hash);
×
129
    rsp.hash = NULL;
×
130
  }
131
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
×
132
  rpcFreeCont(pRpc->pCont);
×
133
}
×
134

135
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
50,816,539✔
136
  SDnodeTrans  *pTrans = &pDnode->trans;
50,816,539✔
137
  int32_t       code = -1;
50,816,539✔
138
  SRpcMsg      *pMsg = NULL;
50,816,539✔
139
  SMgmtWrapper *pWrapper = NULL;
50,816,539✔
140
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
50,816,539✔
141

142
  const STraceId *trace = &pRpc->info.traceId;
50,816,539✔
143
  dGDebug("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64 " %" PRIx64 ":%" PRIx64, TMSG_INFO(pRpc->msgType),
50,816,539!
144
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId, TRACE_GET_ROOTID(trace), TRACE_GET_MSGID(trace));
145

146
  int32_t svrVer = 0;
50,816,539✔
147
  code = taosVersionStrToInt(td_version, &svrVer);
50,816,539✔
148
  if (code != 0) {
50,834,294!
149
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
150
    goto _OVER;
×
151
  }
152
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
50,834,294!
153
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
154
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
155
    goto _OVER;
×
156
  }
157

158
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, &pRpc->info.conn.cliAddr);
50,813,933✔
159
  if (isForbidden) {
50,799,054!
160
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
161
    goto _OVER;
×
162
  }
163

164
  switch (pRpc->msgType) {
50,799,054!
165
    case TDMT_DND_NET_TEST:
×
166
      dmProcessNetTestReq(pDnode, pRpc);
×
167
      return;
4,991,271✔
168
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
4,981,357✔
169
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
170
    case TDMT_SCH_FETCH_RSP:
171
    case TDMT_SCH_MERGE_FETCH_RSP:
172
    case TDMT_VND_SUBMIT_RSP:
173
    case TDMT_MND_GET_DB_INFO_RSP:
174
    case TDMT_STREAM_FETCH_RSP:
175
    case TDMT_STREAM_FETCH_FROM_RUNNER_RSP:
176
    case TDMT_STREAM_FETCH_FROM_CACHE_RSP:
177
    case TDMT_VND_SNODE_DROP_TABLE_RSP:
178
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
4,981,357✔
179
      return;
4,991,264✔
180
    case TDMT_MND_STATUS_RSP:
×
181
      if (pEpSet != NULL) {
×
182
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
183
      }
184
      break;
×
185
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
186
      dmUpdateRpcIpWhiteUnused(&pDnode->data, pTrans->serverRpc, pRpc);
×
187
      return;
×
188
    case TDMT_MND_RETRIEVE_IP_WHITE_DUAL_RSP:
7✔
189
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
7✔
190
      return;
7✔
191
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
×
192
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
×
193
      return;
×
194
    default:
45,817,690✔
195
      break;
45,817,690✔
196
  }
197

198
  /*
199
  pDnode is null, TD-22618
200
  at trans.c line 91
201
  before this line, dmProcessRpcMsg callback is set
202
  after this line, parent is set
203
  so when dmProcessRpcMsg is called, pDonde is still null.
204
  */
205
  if (pDnode != NULL) {
45,817,690!
206
    if (pDnode->status != DND_STAT_RUNNING) {
45,820,498✔
207
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
43,537!
208
        dmProcessServerStartupStatus(pDnode, pRpc);
×
209
        return;
×
210
      } else {
211
        if (pDnode->status == DND_STAT_INIT) {
43,537✔
212
          code = TSDB_CODE_APP_IS_STARTING;
23,602✔
213
        } else {
214
          code = TSDB_CODE_APP_IS_STOPPING;
19,935✔
215
        }
216
        goto _OVER;
43,537✔
217
      }
218
    }
219
  } else {
220
    code = TSDB_CODE_APP_IS_STARTING;
×
221
    goto _OVER;
×
222
  }
223

224
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
45,776,961!
225
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
226
    code = TSDB_CODE_INVALID_MSG_LEN;
×
227
    goto _OVER;
×
228
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
45,776,961!
229
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
×
230
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
174!
231
  }
232

233
  if (pHandle->defaultNtype == NODE_END) {
45,776,961!
234
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
235
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
236
    goto _OVER;
×
237
  }
238

239
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
45,776,961✔
240
  if (pHandle->needCheckVgId) {
45,776,961✔
241
    if (pRpc->contLen > 0) {
31,176,176!
242
      const SMsgHead *pHead = pRpc->pCont;
31,206,901✔
243
      const int32_t   vgId = ntohl(pHead->vgId);
31,206,901✔
244
      switch (vgId) {
31,206,901!
245
        case QNODE_HANDLE:
×
246
          pWrapper = &pDnode->wrappers[QNODE];
×
247
          break;
×
248
        case SNODE_HANDLE:
23,126✔
249
          pWrapper = &pDnode->wrappers[SNODE];
23,126✔
250
          break;
23,126✔
251
        case MNODE_HANDLE:
894,879✔
252
          pWrapper = &pDnode->wrappers[MNODE];
894,879✔
253
          break;
894,879✔
254
        default:
30,288,896✔
255
          break;
30,288,896✔
256
      }
257
    } else {
258
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
259
      code = TSDB_CODE_INVALID_MSG_LEN;
×
260
      goto _OVER;
×
261
    }
262
  }
263

264
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
45,807,686✔
265
    pWrapper = NULL;
298✔
266
    goto _OVER;
298✔
267
  }
268

269
  pRpc->info.wrapper = pWrapper;
45,833,334✔
270

271
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
45,833,334✔
272
  if (IsReq(pRpc)) {
45,833,334✔
273
    if (pRpc->msgType == TDMT_SYNC_HEARTBEAT || pRpc->msgType == TDMT_SYNC_HEARTBEAT_REPLY)
45,626,552✔
274
      itype = DEF_QITEM;
147,931✔
275
    else
276
      itype = RPC_QITEM;
45,478,621✔
277
  } else {
278
    itype = DEF_QITEM;
206,782✔
279
  }
280
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
45,833,334✔
281
  if (code) goto _OVER;
45,878,209!
282

283
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
45,878,209✔
284
  dGDebug("msg:%p, is created, type:%s handle:%p len:%d %" PRIx64 ":%" PRIx64, pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
45,878,209!
285
          pRpc->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
286

287
  code = dmProcessNodeMsg(pWrapper, pMsg);
45,878,212✔
288

289
_OVER:
45,901,029✔
290
  if (code != 0) {
45,901,029✔
291
    code = dmConvertErrCode(pRpc->msgType, code);
56,797✔
292
    if (pMsg) {
56,797✔
293
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
12,961!
294
    } else {
295
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
43,836!
296
    }
297

298
    if (IsReq(pRpc)) {
56,797✔
299
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
56,758✔
300
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
56,758✔
301
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
300✔
302
      }
303

304
      if (pWrapper != NULL) {
56,750✔
305
        dmSendRsp(&rsp);
12,948✔
306
      } else {
307
        if (rpcSendResponse(&rsp) != 0) {
43,802!
308
          dError("failed to send response, msg:%p", &rsp);
×
309
        }
310
      }
311
    } else if (NULL == pMsg && IS_STREAM_TRIGGER_RSP_MSG(pRpc->msgType)) {
39!
312
      destroyAhandle(pRpc->info.ahandle);
6✔
313
      dDebug("msg:%s ahandle freed", TMSG_INFO(pRpc->msgType));
6!
314
    }
315

316
    if (pMsg != NULL) {
56,798✔
317
      dGTrace("msg:%p, is freed", pMsg);
12,949!
318
      taosFreeQitem(pMsg);
12,949✔
319
    }
320
    rpcFreeCont(pRpc->pCont);
56,805✔
321
    pRpc->pCont = NULL;
56,806✔
322
  }
323

324
  dmReleaseWrapper(pWrapper);
45,901,038✔
325
}
326

327
int32_t dmInitMsgHandle(SDnode *pDnode) {
1,834✔
328
  SDnodeTrans *pTrans = &pDnode->trans;
1,834✔
329

330
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
12,838✔
331
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
11,004✔
332
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
11,004✔
333
    if (pArray == NULL) return -1;
11,004!
334

335
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
619,892✔
336
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
608,888✔
337
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
608,888✔
338
      if (pMgmt->needCheckVgId) {
608,888✔
339
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
71,526✔
340
      }
341
      if (!pMgmt->needCheckVgId) {
608,888✔
342
        pHandle->defaultNtype = ntype;
537,362✔
343
      }
344
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
608,888✔
345
    }
346

347
    taosArrayDestroy(pArray);
11,004✔
348
  }
349

350
  return 0;
1,834✔
351
}
352

353
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
508,553✔
354
  int32_t code = 0;
508,553✔
355
  SDnode *pDnode = dmInstance();
508,553✔
356
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
508,546!
357
    rpcFreeCont(pMsg->pCont);
1,563✔
358
    pMsg->pCont = NULL;
1,562✔
359
    if (pDnode->status == DND_STAT_INIT) {
1,562!
360
      code = TSDB_CODE_APP_IS_STARTING;
×
361
    } else {
362
      code = TSDB_CODE_APP_IS_STOPPING;
1,562✔
363
    }
364
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
1,562!
365
           pMsg->info.handle);
366
    return code;
1,563✔
367
  } else {
368
    pMsg->info.handle = 0;
506,983✔
369
    code = rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
506,983✔
370
    if (code != 0) {
507,050✔
371
      dError("failed to send rpc msg");
2!
372
      return code;
×
373
    }
374
    return 0;
507,048✔
375
  }
376
}
377
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
5,913,108✔
378
  int32_t code = 0;
5,913,108✔
379
  SDnode *pDnode = dmInstance();
5,913,108✔
380
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
5,913,106!
381
    rpcFreeCont(pMsg->pCont);
×
382
    pMsg->pCont = NULL;
×
383
    if (pDnode->status == DND_STAT_INIT) {
×
384
      code = TSDB_CODE_APP_IS_STARTING;
×
385
    } else {
386
      code = TSDB_CODE_APP_IS_STOPPING;
×
387
    }
388
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
389
           pMsg->info.handle);
390
    return code;
×
391
  } else {
392
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
5,913,106✔
393
  }
394
}
395

396
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
8,830,102✔
397

398
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
8,823,700✔
399
  (void)rpcReleaseHandle(pHandle, type, status);
8,823,700✔
400
}
8,832,674✔
401

402
static bool rpcRfp(int32_t code, tmsg_t msgType) {
108,642✔
403
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
108,642!
404
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
91,319✔
405
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
60,706✔
406
      code == TSDB_CODE_APP_IS_STOPPING) {
407
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
52,193!
408
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
52,243!
409
      return false;
×
410
    }
411
    return true;
52,243✔
412
  } else {
413
    return false;
56,449✔
414
  }
415
}
416
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
417
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_QUERY_SSMIGRATE_PROGRESS ||
×
418
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE ||
×
419
      msgType == TDMT_VND_QUERY_SCAN_PROGRESS || msgType == TDMT_VND_QUERY_TRIM_PROGRESS) {
×
420
    return true;
×
421
  }
422
  return false;
×
423
}
424
int32_t dmInitClient(SDnode *pDnode) {
1,833✔
425
  SDnodeTrans *pTrans = &pDnode->trans;
1,833✔
426

427
  SRpcInit rpcInit = {0};
1,833✔
428
  rpcInit.label = "DNODE-CLI";
1,833✔
429
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
1,833✔
430
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
1,833✔
431
  rpcInit.sessions = 1024;
1,833✔
432
  rpcInit.connType = TAOS_CONN_CLIENT;
1,833✔
433
  rpcInit.user = TSDB_DEFAULT_USER;
1,833✔
434
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,833✔
435
  rpcInit.parent = pDnode;
1,833✔
436
  rpcInit.rfp = rpcRfp;
1,833✔
437
  rpcInit.compressSize = tsCompressMsgSize;
1,833✔
438
  rpcInit.dfp = destroyAhandle;
1,833✔
439

440
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,833✔
441
  rpcInit.retryStepFactor = tsRedirectFactor;
1,833✔
442
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,833✔
443
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,833✔
444

445
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
1,833✔
446
  rpcInit.failFastThreshold = 3;    // failed threshold
1,833✔
447
  rpcInit.ffp = dmFailFastFp;
1,833✔
448

449
  rpcInit.noDelayFp = rpcNoDelayMsg;
1,833✔
450

451
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
1,833✔
452
  connLimitNum = TMAX(connLimitNum, 10);
1,833✔
453
  connLimitNum = TMIN(connLimitNum, 500);
1,833✔
454

455
  rpcInit.connLimitNum = connLimitNum;
1,833✔
456
  rpcInit.connLimitLock = 1;
1,833✔
457
  rpcInit.supportBatch = 1;
1,833✔
458
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
1,833✔
459
  rpcInit.shareConn = 1;
1,833✔
460
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,833✔
461
  rpcInit.notWaitAvaliableConn = 0;
1,833✔
462
  rpcInit.startReadTimer = 1;
1,833✔
463
  rpcInit.readTimeout = tsReadTimeout;
1,833✔
464
  rpcInit.ipv6 = tsEnableIpv6;
1,833✔
465
  rpcInit.enableSSL = tsEnableTLS;
1,833✔
466

467
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
1,833✔
468
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
1,833✔
469
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
1,833✔
470
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
1,833✔
471
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
1,833✔
472

473
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
1,833!
474
    dError("failed to convert version string:%s to int", td_version);
×
475
  }
476

477
  pTrans->clientRpc = rpcOpen(&rpcInit);
1,833✔
478
  if (pTrans->clientRpc == NULL) {
1,833!
479
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
480
    return terrno;
×
481
  }
482

483
  dDebug("dnode rpc client is initialized");
1,833✔
484
  return 0;
1,833✔
485
}
486
int32_t dmInitStatusClient(SDnode *pDnode) {
1,833✔
487
  SDnodeTrans *pTrans = &pDnode->trans;
1,833✔
488

489
  SRpcInit rpcInit = {0};
1,833✔
490
  rpcInit.label = "DNODE-STA-CLI";
1,833✔
491
  rpcInit.numOfThreads = 1;
1,833✔
492
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
1,833✔
493
  rpcInit.sessions = 1024;
1,833✔
494
  rpcInit.connType = TAOS_CONN_CLIENT;
1,833✔
495
  rpcInit.user = TSDB_DEFAULT_USER;
1,833✔
496
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,833✔
497
  rpcInit.parent = pDnode;
1,833✔
498
  rpcInit.rfp = rpcRfp;
1,833✔
499
  rpcInit.compressSize = tsCompressMsgSize;
1,833✔
500

501
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,833✔
502
  rpcInit.retryStepFactor = tsRedirectFactor;
1,833✔
503
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,833✔
504
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,833✔
505

506
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
1,833✔
507
  rpcInit.failFastThreshold = 3;    // failed threshold
1,833✔
508
  rpcInit.ffp = dmFailFastFp;
1,833✔
509

510
  int32_t connLimitNum = 100;
1,833✔
511
  connLimitNum = TMAX(connLimitNum, 10);
1,833✔
512
  connLimitNum = TMIN(connLimitNum, 500);
1,833✔
513

514
  rpcInit.connLimitNum = connLimitNum;
1,833✔
515
  rpcInit.connLimitLock = 1;
1,833✔
516
  rpcInit.supportBatch = 1;
1,833✔
517
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
1,833✔
518
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,833✔
519
  rpcInit.startReadTimer = 0;
1,833✔
520
  rpcInit.readTimeout = 0;
1,833✔
521
  rpcInit.ipv6 = tsEnableIpv6;
1,833✔
522

523
  rpcInit.enableSSL = tsEnableTLS;
1,833✔
524
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
1,833✔
525
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
1,833✔
526
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
1,833✔
527
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
1,833✔
528
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
1,833✔
529

530
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
1,833!
531
    dError("failed to convert version string:%s to int", td_version);
×
532
  }
533

534
  pTrans->statusRpc = rpcOpen(&rpcInit);
1,833✔
535
  if (pTrans->statusRpc == NULL) {
1,833!
536
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
537
    return terrno;
×
538
  }
539

540
  dDebug("dnode rpc status client is initialized");
1,833✔
541
  return 0;
1,833✔
542
}
543

544
int32_t dmInitSyncClient(SDnode *pDnode) {
1,833✔
545
  SDnodeTrans *pTrans = &pDnode->trans;
1,833✔
546

547
  SRpcInit rpcInit = {0};
1,833✔
548
  rpcInit.label = "DNODE-SYNC-CLI";
1,833✔
549
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
1,833✔
550
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
1,833✔
551
  rpcInit.sessions = 1024;
1,833✔
552
  rpcInit.connType = TAOS_CONN_CLIENT;
1,833✔
553
  rpcInit.user = TSDB_DEFAULT_USER;
1,833✔
554
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,833✔
555
  rpcInit.parent = pDnode;
1,833✔
556
  rpcInit.rfp = rpcRfp;
1,833✔
557
  rpcInit.compressSize = tsCompressMsgSize;
1,833✔
558

559
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,833✔
560
  rpcInit.retryStepFactor = tsRedirectFactor;
1,833✔
561
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,833✔
562
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,833✔
563

564
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
1,833✔
565
  rpcInit.failFastThreshold = 3;    // failed threshold
1,833✔
566
  rpcInit.ffp = dmFailFastFp;
1,833✔
567

568
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
1,833✔
569
  connLimitNum = TMAX(connLimitNum, 10);
1,833✔
570
  connLimitNum = TMIN(connLimitNum, 500);
1,833✔
571

572
  rpcInit.connLimitNum = connLimitNum;
1,833✔
573
  rpcInit.connLimitLock = 1;
1,833✔
574
  rpcInit.supportBatch = 1;
1,833✔
575
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
1,833✔
576
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,833✔
577
  rpcInit.startReadTimer = 1;
1,833✔
578
  rpcInit.readTimeout = tsReadTimeout;
1,833✔
579
  rpcInit.ipv6 = tsEnableIpv6;
1,833✔
580
  rpcInit.enableSSL = tsEnableTLS;
1,833✔
581

582
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
1,833✔
583
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
1,833✔
584
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
1,833✔
585
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
1,833✔
586
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
1,833✔
587
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
1,833!
588
    dError("failed to convert version string:%s to int", td_version);
×
589
  }
590

591
  pTrans->syncRpc = rpcOpen(&rpcInit);
1,833✔
592
  if (pTrans->syncRpc == NULL) {
1,833!
593
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
594
    return terrno;
×
595
  }
596

597
  dDebug("dnode rpc sync client is initialized");
1,833✔
598
  return 0;
1,833✔
599
}
600

601
void dmCleanupClient(SDnode *pDnode) {
1,833✔
602
  SDnodeTrans *pTrans = &pDnode->trans;
1,833✔
603
  if (pTrans->clientRpc) {
1,833!
604
    rpcClose(pTrans->clientRpc);
1,833✔
605
    pTrans->clientRpc = NULL;
1,833✔
606
    dDebug("dnode rpc client is closed");
1,833✔
607
  }
608
}
1,833✔
609
void dmCleanupStatusClient(SDnode *pDnode) {
1,833✔
610
  SDnodeTrans *pTrans = &pDnode->trans;
1,833✔
611
  if (pTrans->statusRpc) {
1,833!
612
    rpcClose(pTrans->statusRpc);
1,833✔
613
    pTrans->statusRpc = NULL;
1,833✔
614
    dDebug("dnode rpc status client is closed");
1,833✔
615
  }
616
}
1,833✔
617
void dmCleanupSyncClient(SDnode *pDnode) {
1,833✔
618
  SDnodeTrans *pTrans = &pDnode->trans;
1,833✔
619
  if (pTrans->syncRpc) {
1,833!
620
    rpcClose(pTrans->syncRpc);
1,833✔
621
    pTrans->syncRpc = NULL;
1,833✔
622
    dDebug("dnode rpc sync client is closed");
1,833✔
623
  }
624
}
1,833✔
625

626
int32_t dmInitServer(SDnode *pDnode) {
1,834✔
627
  int32_t      code = 0;
1,834✔
628
  SDnodeTrans *pTrans = &pDnode->trans;
1,834✔
629

630
  SRpcInit rpcInit = {0};
1,834✔
631
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
1,834✔
632

633
  rpcInit.localPort = tsServerPort;
1,834✔
634
  rpcInit.label = "DND-S";
1,834✔
635
  rpcInit.numOfThreads = tsNumOfRpcThreads;
1,834✔
636
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
1,834✔
637
  rpcInit.sessions = tsMaxShellConns;
1,834✔
638
  rpcInit.connType = TAOS_CONN_SERVER;
1,834✔
639
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,834✔
640
  rpcInit.parent = pDnode;
1,834✔
641
  rpcInit.compressSize = tsCompressMsgSize;
1,834✔
642
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
1,834✔
643
  rpcInit.ipv6 = tsEnableIpv6;
1,834✔
644
  rpcInit.enableSSL = tsEnableTLS;
1,834✔
645

646
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
1,834✔
647
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
1,834✔
648
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
1,834✔
649
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
1,834✔
650
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
1,834✔
651

652
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
1,834!
653
    dError("failed to convert version string:%s to int", td_version);
×
654
  }
655

656
  pTrans->serverRpc = rpcOpen(&rpcInit);
1,834✔
657
  if (pTrans->serverRpc == NULL) {
1,834✔
658
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
1!
659
    return terrno;
1✔
660
  }
661

662
  dDebug("dnode rpc server is initialized");
1,833✔
663
  return 0;
1,833✔
664
}
665

666
void dmCleanupServer(SDnode *pDnode) {
1,833✔
667
  SDnodeTrans *pTrans = &pDnode->trans;
1,833✔
668
  if (pTrans->serverRpc) {
1,833!
669
    rpcClose(pTrans->serverRpc);
1,833✔
670
    pTrans->serverRpc = NULL;
1,833✔
671
    dDebug("dnode rpc server is closed");
1,833✔
672
  }
673
}
1,833✔
674

675
SMsgCb dmGetMsgcb(SDnode *pDnode) {
17,634✔
676
  SMsgCb msgCb = {
17,634✔
677
      .clientRpc = pDnode->trans.clientRpc,
17,634✔
678
      .serverRpc = pDnode->trans.serverRpc,
17,634✔
679
      .statusRpc = pDnode->trans.statusRpc,
17,634✔
680
      .syncRpc = pDnode->trans.syncRpc,
17,634✔
681
      .sendReqFp = dmSendReq,
682
      .sendSyncReqFp = dmSendSyncReq,
683
      .sendRspFp = dmSendRsp,
684
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
685
      .releaseHandleFp = dmReleaseHandle,
686
      .reportStartupFp = dmReportStartup,
687
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
688
      .getDnodeEpFp = dmGetDnodeEp,
689
      .data = &pDnode->data,
17,634✔
690
  };
691
  return msgCb;
17,634✔
692
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc