• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4910

30 Dec 2025 10:52AM UTC coverage: 65.864% (+0.3%) from 65.542%
#4910

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

999 existing lines in 108 files now uncovered.

194877 of 295877 relevant lines covered (65.86%)

121300574.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.94
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21
#define IS_STREAM_TRIGGER_RSP_MSG(_msg) (TDMT_STREAM_TRIGGER_CALC_RSP == (_msg) || TDMT_STREAM_TRIGGER_PULL_RSP == (_msg) || TDMT_STREAM_TRIGGER_DROP_RSP == (_msg))
22

23
static inline void dmSendRsp(SRpcMsg *pMsg) {
1,835,537✔
24
  if (rpcSendResponse(pMsg) != 0) {
1,835,537✔
25
    dError("failed to send response, msg:%p", pMsg);
×
26
  }
27
}
1,835,537✔
28

29
static char *getUserFromConnInfo(SRpcConnInfo *pConnInfo) {
×
30
  if (pConnInfo == NULL) {
×
31
    return "unknown";
×
32
  }
33
  return pConnInfo->isToken ? pConnInfo->identifier : pConnInfo->user;
×
34
}
35

36
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
43,676✔
37
  SEpSet epSet = {0};
43,676✔
38
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
43,676✔
39

40
  if (epSet.numOfEps <= 1) {
44,552✔
41
    if (epSet.numOfEps == 0) {
24,598✔
42
      pMsg->pCont = NULL;
×
43
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
44
      return;
×
45
    }
46
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
47
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
48
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
24,598✔
49
      pMsg->pCont = NULL;
×
50
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
51
      return;
×
52
    }
53
  }
54

55
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
44,552✔
56
  pMsg->pCont = rpcMallocCont(contLen);
43,484✔
57
  if (pMsg->pCont == NULL) {
44,735✔
58
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
59
  } else {
60
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
44,735✔
61
    if (contLen < 0) {
44,735✔
62
      pMsg->code = contLen;
×
63
      return;
×
64
    }
65
    pMsg->contLen = contLen;
44,735✔
66
  }
67
}
68

69
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
2,147,483,647✔
70
  const STraceId *trace = &pMsg->info.traceId;
2,147,483,647✔
71

72
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
2,147,483,647✔
73
  if (msgFp == NULL) {
2,147,483,647✔
74
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
75
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
76
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
77
  }
78

79
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
2,147,483,647✔
80
  pMsg->info.wrapper = pWrapper;
2,147,483,647✔
81
  return (*msgFp)(pWrapper->pMgmt, pMsg);
2,147,483,647✔
82
}
83

84
static bool dmFailFastFp(tmsg_t msgType) {
×
85
  // add more msg type later
86
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
87
}
88

89
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
10,406,205✔
90
  if (code != TSDB_CODE_APP_IS_STOPPING) {
10,406,205✔
91
    return code;
2,696,185✔
92
  }
93
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
7,710,020✔
94
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
3,006,147✔
95
    code = TSDB_CODE_VND_STOPPED;
1,977,379✔
96
  }
97
  return code;
7,710,020✔
98
}
99
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
420✔
100
  int32_t        code = 0;
420✔
101
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
420✔
102
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
420✔
103
  if (code < 0) {
420✔
104
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
105
    return;
×
106
  }
107
  code = rpcSetIpWhite(pTrans, &ipWhite);
420✔
108
  pData->ipWhiteVer = ipWhite.ver;
420✔
109

110
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
420✔
111

112
  rpcFreeCont(pRpc->pCont);
420✔
113
}
114

115
static void dmUpdateRpcIpWhiteUnused(SDnodeData *pDnode, void *pTrans, SRpcMsg *pRpc) {
×
116
  int32_t code = TSDB_CODE_INVALID_MSG;
×
117
  dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
118
  rpcFreeCont(pRpc->pCont);
×
119
  pRpc->pCont = NULL;
×
120
  return;
×
121
}
122
static int32_t dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
2,147,483,647✔
123
  if (IP_FORBIDDEN_CHECK_WHITE_LIST(forbidden)) {
2,147,483,647✔
124
    dError("User:%s host:%s not in ip white list or in block white list", user, IP_ADDR_STR(clientIp));
×
125
    return TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
126

127
  } else if (IP_FORBIDDEN_CHECK_DATA_TIME_WHITE_LIST(forbidden)) {
2,147,483,647✔
128
    dError("User:%s host:%s already expired", user, IP_ADDR_STR(clientIp));
×
129
    return TSDB_CODE_MND_USER_DISABLED;
×
130
  } else {
131
    return 0;
2,147,483,647✔
132
  }
133
}
134

135
static void dmUpdateRpcTimeWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
420✔
136
  int32_t        code = 0;
420✔
137
  SRetrieveDateTimeWhiteListRsp timeWhite = {0};
420✔
138
  code = tDeserializeSRetrieveDateTimeWhiteListRsp(pRpc->pCont, pRpc->contLen, &timeWhite);
420✔
139
  if (code < 0) {
420✔
140
    dError("failed to update rpc datetime-white since: %s", tstrerror(code));
×
141
    return;
×
142
  }
143
  // TODO: implement rpcSetTimeWhite
144
  code = rpcSetTimeIpWhite(pTrans, &timeWhite);
420✔
145
  pData->timeWhiteVer = timeWhite.ver;
420✔
146

147
  (void)tFreeSRetrieveDateTimeWhiteListRsp(&timeWhite);
420✔
148

149
  rpcFreeCont(pRpc->pCont);
420✔
150
}
151

152
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
153
  SRetrieveAnalyticAlgoRsp rsp = {0};
×
154
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
155
    taosAnalyUpdate(rsp.ver, rsp.hash);
×
156
    rsp.hash = NULL;
×
157
  }
158
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
×
159
  rpcFreeCont(pRpc->pCont);
×
160
}
×
161

162
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
2,147,483,647✔
163
  SDnodeTrans  *pTrans = &pDnode->trans;
2,147,483,647✔
164
  int32_t       code = -1;
2,147,483,647✔
165
  SRpcMsg      *pMsg = NULL;
2,147,483,647✔
166
  SMgmtWrapper *pWrapper = NULL;
2,147,483,647✔
167
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
2,147,483,647✔
168

169
  const STraceId *trace = &pRpc->info.traceId;
2,147,483,647✔
170
  dGDebug("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64 " %" PRIx64 ":%" PRIx64, TMSG_INFO(pRpc->msgType),
2,147,483,647✔
171
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId, TRACE_GET_ROOTID(trace), TRACE_GET_MSGID(trace));
172

173
  int32_t svrVer = 0;
2,147,483,647✔
174
  code = taosVersionStrToInt(td_version, &svrVer);
2,147,483,647✔
175
  if (code != 0) {
2,147,483,647✔
176
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
177
    goto _OVER;
×
178
  }
179
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
2,147,483,647✔
180
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
181
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
182
    goto _OVER;
×
183
  }
184

185
  code = dmIsForbiddenIp(pRpc->info.forbiddenIp, RPC_MSG_USER(pRpc), &pRpc->info.conn.cliAddr);
2,147,483,647✔
186
  if (code != 0) {
2,147,483,647✔
187
    goto _OVER;
×
188
  }
189

190
  switch (pRpc->msgType) {
2,147,483,647✔
191
    case TDMT_DND_NET_TEST:
×
192
      dmProcessNetTestReq(pDnode, pRpc);
×
193
      return;
6,184✔
194
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
253,430,824✔
195
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
196
    case TDMT_SCH_FETCH_RSP:
197
    case TDMT_SCH_MERGE_FETCH_RSP:
198
    case TDMT_VND_SUBMIT_RSP:
199
    case TDMT_MND_GET_DB_INFO_RSP:
200
    case TDMT_STREAM_FETCH_RSP:
201
    case TDMT_STREAM_FETCH_FROM_RUNNER_RSP:
202
    case TDMT_STREAM_FETCH_FROM_CACHE_RSP:
203
    case TDMT_VND_SNODE_DROP_TABLE_RSP:
204
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
253,430,824✔
205
      return;
253,510,509✔
206
    case TDMT_MND_STATUS_RSP:
×
207
      if (pEpSet != NULL) {
×
208
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
209
      }
210
      break;
×
211
    case TDMT_MND_RETRIEVE_IP_WHITELIST_RSP:
×
212
      dmUpdateRpcIpWhiteUnused(&pDnode->data, pTrans->serverRpc, pRpc);
×
213
      return;
×
214
    case TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL_RSP:
420✔
215
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
420✔
216
      return;
420✔
217
    case TDMT_MND_RETRIEVE_DATETIME_WHITELIST_RSP:
420✔
218
      dmUpdateRpcTimeWhite(&pDnode->data, pTrans->serverRpc, pRpc);
420✔
219
      return;
420✔
220
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
237,928✔
221
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
237,928✔
222
      return;
×
223
    default:
2,147,483,647✔
224
      break;
2,147,483,647✔
225
  }
226

227
  /*
228
  pDnode is null, TD-22618
229
  at trans.c line 91
230
  before this line, dmProcessRpcMsg callback is set
231
  after this line, parent is set
232
  so when dmProcessRpcMsg is called, pDonde is still null.
233
  */
234
  if (pDnode != NULL) {
2,147,483,647✔
235
    if (pDnode->status != DND_STAT_RUNNING) {
2,147,483,647✔
236
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
8,525,790✔
237
        dmProcessServerStartupStatus(pDnode, pRpc);
×
238
        return;
×
239
      } else {
240
        if (pDnode->status == DND_STAT_INIT) {
8,526,090✔
241
          code = TSDB_CODE_APP_IS_STARTING;
815,770✔
242
        } else {
243
          code = TSDB_CODE_APP_IS_STOPPING;
7,710,320✔
244
        }
245
        goto _OVER;
8,526,090✔
246
      }
247
    }
248
  } else {
249
    code = TSDB_CODE_APP_IS_STARTING;
6,492✔
250
    goto _OVER;
6,492✔
251
  }
252

253
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
2,147,483,647✔
254
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
255
    code = TSDB_CODE_INVALID_MSG_LEN;
×
256
    goto _OVER;
×
257
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
2,147,483,647✔
258
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
756,461✔
259
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
25,939✔
260
  }
261

262
  if (pHandle->defaultNtype == NODE_END) {
2,147,483,647✔
263
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
264
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
265
    goto _OVER;
×
266
  }
267

268
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
2,147,483,647✔
269
  if (pHandle->needCheckVgId) {
2,147,483,647✔
270
    if (pRpc->contLen > 0) {
1,458,744,470✔
271
      const SMsgHead *pHead = pRpc->pCont;
1,458,758,198✔
272
      const int32_t   vgId = ntohl(pHead->vgId);
1,458,782,533✔
273
      switch (vgId) {
1,458,501,285✔
274
        case QNODE_HANDLE:
×
275
          pWrapper = &pDnode->wrappers[QNODE];
×
276
          break;
×
277
        case SNODE_HANDLE:
1,799,004✔
278
          pWrapper = &pDnode->wrappers[SNODE];
1,799,004✔
279
          break;
1,799,004✔
280
        case MNODE_HANDLE:
25,306,613✔
281
          pWrapper = &pDnode->wrappers[MNODE];
25,306,613✔
282
          break;
25,372,021✔
283
        default:
1,431,395,668✔
284
          break;
1,431,395,668✔
285
      }
286
    } else {
287
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
222✔
288
      code = TSDB_CODE_INVALID_MSG_LEN;
222✔
289
      goto _OVER;
222✔
290
    }
291
  }
292

293
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
2,147,483,647✔
294
    pWrapper = NULL;
44,735✔
295
    goto _OVER;
44,735✔
296
  }
297

298
  pRpc->info.wrapper = pWrapper;
2,147,483,647✔
299

300
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
2,147,483,647✔
301
  if (IsReq(pRpc)) {
2,147,483,647✔
302
    if (pRpc->msgType == TDMT_SYNC_HEARTBEAT || pRpc->msgType == TDMT_SYNC_HEARTBEAT_REPLY)
2,147,483,647✔
303
      itype = DEF_QITEM;
42,616,973✔
304
    else
305
      itype = RPC_QITEM;
2,147,483,647✔
306
  } else {
307
    itype = DEF_QITEM;
87,804,092✔
308
  }
309
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
2,147,483,647✔
310
  if (code) goto _OVER;
2,147,483,647✔
311

312
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
2,147,483,647✔
313
  dGDebug("msg:%p, is created, type:%s handle:%p len:%d %" PRIx64 ":%" PRIx64, pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
2,147,483,647✔
314
          pRpc->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
315

316
  code = dmProcessNodeMsg(pWrapper, pMsg);
2,147,483,647✔
317

318
_OVER:
2,147,483,647✔
319
  if (code != 0) {
2,147,483,647✔
320
    code = dmConvertErrCode(pRpc->msgType, code);
10,406,199✔
321
    if (pMsg) {
10,406,218✔
322
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
1,835,537✔
323
    } else {
324
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
8,570,681✔
325
    }
326

327
    if (IsReq(pRpc)) {
10,406,218✔
328
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
10,391,508✔
329
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
10,390,810✔
330
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
44,196✔
331
      }
332

333
      if (pWrapper != NULL) {
10,390,876✔
334
        dmSendRsp(&rsp);
1,835,537✔
335
      } else {
336
        if (rpcSendResponse(&rsp) != 0) {
8,555,339✔
337
          dError("failed to send response, msg:%p", &rsp);
×
338
        }
339
      }
340
    } else if (NULL == pMsg && IS_STREAM_TRIGGER_RSP_MSG(pRpc->msgType)) {
14,589✔
341
      destroyAhandle(pRpc->info.ahandle);
6,841✔
342
      dDebug("msg:%s ahandle freed", TMSG_INFO(pRpc->msgType));
7,123✔
343
    }
344

345
    if (pMsg != NULL) {
10,405,678✔
346
      dGTrace("msg:%p, is freed", pMsg);
1,835,537✔
347
      taosFreeQitem(pMsg);
1,835,537✔
348
    }
349
    rpcFreeCont(pRpc->pCont);
10,405,678✔
350
    pRpc->pCont = NULL;
10,405,713✔
351
  }
352

353
  dmReleaseWrapper(pWrapper);
2,147,483,647✔
354
}
355

356
int32_t dmInitMsgHandle(SDnode *pDnode) {
536,364✔
357
  SDnodeTrans *pTrans = &pDnode->trans;
536,364✔
358

359
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
3,754,548✔
360
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
3,218,184✔
361
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
3,218,184✔
362
    if (pArray == NULL) return -1;
3,218,184✔
363

364
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
196,309,224✔
365
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
193,091,040✔
366
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
193,091,040✔
367
      if (pMgmt->needCheckVgId) {
193,091,040✔
368
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
20,918,196✔
369
      }
370
      if (!pMgmt->needCheckVgId) {
193,091,040✔
371
        pHandle->defaultNtype = ntype;
172,172,844✔
372
      }
373
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
193,091,040✔
374
    }
375

376
    taosArrayDestroy(pArray);
3,218,184✔
377
  }
378

379
  return 0;
536,364✔
380
}
381

382
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
158,256,923✔
383
  int32_t code = 0;
158,256,923✔
384
  SDnode *pDnode = dmInstance();
158,256,923✔
385
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
158,256,566✔
386
    rpcFreeCont(pMsg->pCont);
694,369✔
387
    pMsg->pCont = NULL;
694,568✔
388
    if (pDnode->status == DND_STAT_INIT) {
694,568✔
389
      code = TSDB_CODE_APP_IS_STARTING;
986✔
390
    } else {
391
      code = TSDB_CODE_APP_IS_STOPPING;
693,582✔
392
    }
393
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
694,568✔
394
           pMsg->info.handle);
395
    return code;
694,568✔
396
  } else {
397
    pMsg->info.handle = 0;
157,563,914✔
398
    code = rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
157,561,983✔
399
    if (code != 0) {
157,564,308✔
400
      dError("failed to send rpc msg");
×
401
      return code;
×
402
    }
403
    return 0;
157,564,308✔
404
  }
405
}
406
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
233,816,838✔
407
  int32_t code = 0;
233,816,838✔
408
  SDnode *pDnode = dmInstance();
233,816,838✔
409
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
233,812,495✔
410
    rpcFreeCont(pMsg->pCont);
×
411
    pMsg->pCont = NULL;
×
412
    if (pDnode->status == DND_STAT_INIT) {
×
413
      code = TSDB_CODE_APP_IS_STARTING;
×
414
    } else {
415
      code = TSDB_CODE_APP_IS_STOPPING;
×
416
    }
417
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
418
           pMsg->info.handle);
419
    return code;
×
420
  } else {
421
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
233,818,334✔
422
  }
423
}
424

425
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
511,354,836✔
426

427
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
509,806,109✔
428
  (void)rpcReleaseHandle(pHandle, type, status);
509,806,109✔
429
}
509,843,726✔
430

431
static bool rpcRfp(int32_t code, tmsg_t msgType) {
55,069,958✔
432
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
55,069,958✔
433
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
52,413,183✔
434
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
40,771,237✔
435
      code == TSDB_CODE_APP_IS_STOPPING) {
436
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
15,419,537✔
437
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
15,422,189✔
UNCOV
438
      return false;
×
439
    }
440
    return true;
15,422,189✔
441
  } else {
442
    return false;
39,650,421✔
443
  }
444
}
445
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
446
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_QUERY_SSMIGRATE_PROGRESS ||
×
447
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE ||
×
448
      msgType == TDMT_VND_QUERY_SCAN_PROGRESS || msgType == TDMT_VND_QUERY_TRIM_PROGRESS) {
×
449
    return true;
×
450
  }
451
  return false;
×
452
}
453
int32_t dmInitClient(SDnode *pDnode) {
536,336✔
454
  SDnodeTrans *pTrans = &pDnode->trans;
536,336✔
455

456
  SRpcInit rpcInit = {0};
536,336✔
457
  rpcInit.label = "DNODE-CLI";
536,336✔
458
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
536,336✔
459
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
536,336✔
460
  rpcInit.sessions = 1024;
536,336✔
461
  rpcInit.connType = TAOS_CONN_CLIENT;
536,336✔
462
  rpcInit.user = TSDB_DEFAULT_USER;
536,336✔
463
  rpcInit.idleTime = tsShellActivityTimer * 1000;
536,336✔
464
  rpcInit.parent = pDnode;
536,336✔
465
  rpcInit.rfp = rpcRfp;
536,336✔
466
  rpcInit.compressSize = tsCompressMsgSize;
536,336✔
467
  rpcInit.dfp = destroyAhandle;
536,336✔
468

469
  rpcInit.retryMinInterval = tsRedirectPeriod;
536,336✔
470
  rpcInit.retryStepFactor = tsRedirectFactor;
536,336✔
471
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
536,336✔
472
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
536,336✔
473

474
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
536,336✔
475
  rpcInit.failFastThreshold = 3;    // failed threshold
536,336✔
476
  rpcInit.ffp = dmFailFastFp;
536,336✔
477

478
  rpcInit.noDelayFp = rpcNoDelayMsg;
536,336✔
479

480
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
536,336✔
481
  connLimitNum = TMAX(connLimitNum, 10);
536,336✔
482
  connLimitNum = TMIN(connLimitNum, 500);
536,336✔
483

484
  rpcInit.connLimitNum = connLimitNum;
536,336✔
485
  rpcInit.connLimitLock = 1;
536,336✔
486
  rpcInit.supportBatch = 1;
536,336✔
487
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
536,336✔
488
  rpcInit.shareConn = 1;
536,336✔
489
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
536,336✔
490
  rpcInit.notWaitAvaliableConn = 0;
536,336✔
491
  rpcInit.startReadTimer = 1;
536,336✔
492
  rpcInit.readTimeout = tsReadTimeout;
536,336✔
493
  rpcInit.ipv6 = tsEnableIpv6;
536,336✔
494
  rpcInit.enableSSL = tsEnableTLS;
536,336✔
495
  rpcInit.enableSasl = tsEnableSasl;
536,336✔
496

497
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
536,336✔
498
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
536,336✔
499
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
536,336✔
500
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
536,336✔
501
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
536,336✔
502

503
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
536,336✔
504
    dError("failed to convert version string:%s to int", td_version);
×
505
  }
506

507
  pTrans->clientRpc = rpcOpen(&rpcInit);
536,336✔
508
  if (pTrans->clientRpc == NULL) {
536,336✔
509
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
510
    return terrno;
×
511
  }
512

513
  dDebug("dnode rpc client is initialized");
536,336✔
514
  return 0;
536,336✔
515
}
516
int32_t dmInitStatusClient(SDnode *pDnode) {
536,336✔
517
  SDnodeTrans *pTrans = &pDnode->trans;
536,336✔
518

519
  SRpcInit rpcInit = {0};
536,336✔
520
  rpcInit.label = "DNODE-STA-CLI";
536,336✔
521
  rpcInit.numOfThreads = 1;
536,336✔
522
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
536,336✔
523
  rpcInit.sessions = 1024;
536,336✔
524
  rpcInit.connType = TAOS_CONN_CLIENT;
536,336✔
525
  rpcInit.user = TSDB_DEFAULT_USER;
536,336✔
526
  rpcInit.idleTime = tsShellActivityTimer * 1000;
536,336✔
527
  rpcInit.parent = pDnode;
536,336✔
528
  rpcInit.rfp = rpcRfp;
536,336✔
529
  rpcInit.compressSize = tsCompressMsgSize;
536,336✔
530

531
  rpcInit.retryMinInterval = tsRedirectPeriod;
536,336✔
532
  rpcInit.retryStepFactor = tsRedirectFactor;
536,336✔
533
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
536,336✔
534
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
536,336✔
535

536
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
536,336✔
537
  rpcInit.failFastThreshold = 3;    // failed threshold
536,336✔
538
  rpcInit.ffp = dmFailFastFp;
536,336✔
539

540
  int32_t connLimitNum = 100;
536,336✔
541
  connLimitNum = TMAX(connLimitNum, 10);
536,336✔
542
  connLimitNum = TMIN(connLimitNum, 500);
536,336✔
543

544
  rpcInit.connLimitNum = connLimitNum;
536,336✔
545
  rpcInit.connLimitLock = 1;
536,336✔
546
  rpcInit.supportBatch = 1;
536,336✔
547
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
536,336✔
548
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
536,336✔
549
  rpcInit.startReadTimer = 0;
536,336✔
550
  rpcInit.readTimeout = 0;
536,336✔
551
  rpcInit.ipv6 = tsEnableIpv6;
536,336✔
552
  rpcInit.enableSasl = tsEnableSasl;
536,336✔
553

554
  rpcInit.enableSSL = tsEnableTLS;
536,336✔
555
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
536,336✔
556
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
536,336✔
557
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
536,336✔
558
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
536,336✔
559
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
536,336✔
560

561
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
536,336✔
562
    dError("failed to convert version string:%s to int", td_version);
×
563
  }
564

565
  pTrans->statusRpc = rpcOpen(&rpcInit);
536,336✔
566
  if (pTrans->statusRpc == NULL) {
536,336✔
567
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
568
    return terrno;
×
569
  }
570

571
  dDebug("dnode rpc status client is initialized");
536,336✔
572
  return 0;
536,336✔
573
}
574

575
int32_t dmInitSyncClient(SDnode *pDnode) {
536,336✔
576
  SDnodeTrans *pTrans = &pDnode->trans;
536,336✔
577

578
  SRpcInit rpcInit = {0};
536,336✔
579
  rpcInit.label = "DNODE-SYNC-CLI";
536,336✔
580
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
536,336✔
581
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
536,336✔
582
  rpcInit.sessions = 1024;
536,336✔
583
  rpcInit.connType = TAOS_CONN_CLIENT;
536,336✔
584
  rpcInit.user = TSDB_DEFAULT_USER;
536,336✔
585
  rpcInit.idleTime = tsShellActivityTimer * 1000;
536,336✔
586
  rpcInit.parent = pDnode;
536,336✔
587
  rpcInit.rfp = rpcRfp;
536,336✔
588
  rpcInit.compressSize = tsCompressMsgSize;
536,336✔
589

590
  rpcInit.retryMinInterval = tsRedirectPeriod;
536,336✔
591
  rpcInit.retryStepFactor = tsRedirectFactor;
536,336✔
592
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
536,336✔
593
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
536,336✔
594

595
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
536,336✔
596
  rpcInit.failFastThreshold = 3;    // failed threshold
536,336✔
597
  rpcInit.ffp = dmFailFastFp;
536,336✔
598

599
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
536,336✔
600
  connLimitNum = TMAX(connLimitNum, 10);
536,336✔
601
  connLimitNum = TMIN(connLimitNum, 500);
536,336✔
602

603
  rpcInit.connLimitNum = connLimitNum;
536,336✔
604
  rpcInit.connLimitLock = 1;
536,336✔
605
  rpcInit.supportBatch = 1;
536,336✔
606
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
536,336✔
607
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
536,336✔
608
  rpcInit.startReadTimer = 1;
536,336✔
609
  rpcInit.readTimeout = tsReadTimeout;
536,336✔
610
  rpcInit.ipv6 = tsEnableIpv6;
536,336✔
611
  rpcInit.enableSSL = tsEnableTLS;
536,336✔
612
  rpcInit.enableSasl = tsEnableSasl;
536,336✔
613

614
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
536,336✔
615
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
536,336✔
616
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
536,336✔
617
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
536,336✔
618
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
536,336✔
619
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
536,336✔
620
    dError("failed to convert version string:%s to int", td_version);
×
621
  }
622

623
  pTrans->syncRpc = rpcOpen(&rpcInit);
536,336✔
624
  if (pTrans->syncRpc == NULL) {
536,336✔
625
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
626
    return terrno;
×
627
  }
628

629
  dDebug("dnode rpc sync client is initialized");
536,336✔
630
  return 0;
536,336✔
631
}
632

633
void dmCleanupClient(SDnode *pDnode) {
536,336✔
634
  SDnodeTrans *pTrans = &pDnode->trans;
536,336✔
635
  if (pTrans->clientRpc) {
536,336✔
636
    rpcClose(pTrans->clientRpc);
536,336✔
637
    pTrans->clientRpc = NULL;
536,336✔
638
    dDebug("dnode rpc client is closed");
536,336✔
639
  }
640
}
536,336✔
641
void dmCleanupStatusClient(SDnode *pDnode) {
536,336✔
642
  SDnodeTrans *pTrans = &pDnode->trans;
536,336✔
643
  if (pTrans->statusRpc) {
536,336✔
644
    rpcClose(pTrans->statusRpc);
536,336✔
645
    pTrans->statusRpc = NULL;
536,336✔
646
    dDebug("dnode rpc status client is closed");
536,336✔
647
  }
648
}
536,336✔
649
void dmCleanupSyncClient(SDnode *pDnode) {
536,336✔
650
  SDnodeTrans *pTrans = &pDnode->trans;
536,336✔
651
  if (pTrans->syncRpc) {
536,336✔
652
    rpcClose(pTrans->syncRpc);
536,336✔
653
    pTrans->syncRpc = NULL;
536,336✔
654
    dDebug("dnode rpc sync client is closed");
536,336✔
655
  }
656
}
536,336✔
657

658
int32_t dmInitServer(SDnode *pDnode) {
536,364✔
659
  int32_t      code = 0;
536,364✔
660
  SDnodeTrans *pTrans = &pDnode->trans;
536,364✔
661

662
  SRpcInit rpcInit = {0};
536,364✔
663
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
536,364✔
664

665
  rpcInit.localPort = tsServerPort;
536,364✔
666
  rpcInit.label = "DND-S";
536,364✔
667
  rpcInit.numOfThreads = tsNumOfRpcThreads;
536,364✔
668
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
536,364✔
669
  rpcInit.sessions = tsMaxShellConns;
536,364✔
670
  rpcInit.connType = TAOS_CONN_SERVER;
536,364✔
671
  rpcInit.idleTime = tsShellActivityTimer * 1000;
536,364✔
672
  rpcInit.parent = pDnode;
536,364✔
673
  rpcInit.compressSize = tsCompressMsgSize;
536,364✔
674
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
536,364✔
675
  rpcInit.ipv6 = tsEnableIpv6;
536,364✔
676
  rpcInit.enableSSL = tsEnableTLS;
536,364✔
677
  rpcInit.enableSasl = tsEnableSasl;
536,364✔
678

679
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
536,364✔
680
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
536,364✔
681
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
536,364✔
682
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
536,364✔
683
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
536,364✔
684

685
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
536,364✔
686
    dError("failed to convert version string:%s to int", td_version);
×
687
  }
688

689
  pTrans->serverRpc = rpcOpen(&rpcInit);
536,364✔
690
  if (pTrans->serverRpc == NULL) {
536,364✔
691
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
28✔
692
    return terrno;
28✔
693
  }
694

695
  dDebug("dnode rpc server is initialized");
536,336✔
696
  return 0;
536,336✔
697
}
698

699
void dmCleanupServer(SDnode *pDnode) {
536,336✔
700
  SDnodeTrans *pTrans = &pDnode->trans;
536,336✔
701
  if (pTrans->serverRpc) {
536,336✔
702
    rpcClose(pTrans->serverRpc);
536,336✔
703
    pTrans->serverRpc = NULL;
536,336✔
704
    dDebug("dnode rpc server is closed");
536,336✔
705
  }
706
}
536,336✔
707

708
SMsgCb dmGetMsgcb(SDnode *pDnode) {
5,038,273✔
709
  SMsgCb msgCb = {
30,217,838✔
710
      .clientRpc = pDnode->trans.clientRpc,
5,038,273✔
711
      .serverRpc = pDnode->trans.serverRpc,
5,038,273✔
712
      .statusRpc = pDnode->trans.statusRpc,
5,038,273✔
713
      .syncRpc = pDnode->trans.syncRpc,
5,038,273✔
714
      .sendReqFp = dmSendReq,
715
      .sendSyncReqFp = dmSendSyncReq,
716
      .sendRspFp = dmSendRsp,
717
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
718
      .releaseHandleFp = dmReleaseHandle,
719
      .reportStartupFp = dmReportStartup,
720
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
721
      .getDnodeEpFp = dmGetDnodeEp,
722
      .data = &pDnode->data,
5,038,273✔
723
  };
724
  return msgCb;
5,038,273✔
725
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc