• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5013

03 Apr 2026 03:59PM UTC coverage: 72.317% (+0.01%) from 72.305%
#5013

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

13131 existing lines in 160 files now uncovered.

257489 of 356056 relevant lines covered (72.32%)

129893134.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.53
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21
#define IS_STREAM_TRIGGER_RSP_MSG(_msg) (TDMT_STREAM_TRIGGER_CALC_RSP == (_msg) || TDMT_STREAM_TRIGGER_PULL_RSP == (_msg) || TDMT_STREAM_TRIGGER_DROP_RSP == (_msg))
22

23
static inline void dmSendRsp(SRpcMsg *pMsg) {
2,424,169✔
24
  if (rpcSendResponse(pMsg) != 0) {
2,424,169✔
25
    dError("failed to send response, msg:%p", pMsg);
×
26
  }
27
}
2,423,866✔
28

29
static char *getUserFromConnInfo(SRpcConnInfo *pConnInfo) {
×
30
  if (pConnInfo == NULL) {
×
31
    return "unknown";
×
32
  }
33
  return pConnInfo->isToken ? pConnInfo->identifier : pConnInfo->user;
×
34
}
35

36
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
49,511✔
37
  SEpSet epSet = {0};
49,511✔
38
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
49,511✔
39

40
  if (epSet.numOfEps <= 1) {
48,703✔
41
    if (epSet.numOfEps == 0) {
22,147✔
42
      pMsg->pCont = NULL;
×
43
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
44
      return;
×
45
    }
46
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
47
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
48
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
22,147✔
49
      pMsg->pCont = NULL;
×
50
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
51
      return;
×
52
    }
53
  }
54

55
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
48,703✔
56
  pMsg->pCont = rpcMallocCont(contLen);
47,610✔
57
  if (pMsg->pCont == NULL) {
49,462✔
58
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
59
  } else {
60
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
49,597✔
61
    if (contLen < 0) {
49,597✔
62
      pMsg->code = contLen;
×
63
      return;
×
64
    }
65
    pMsg->contLen = contLen;
49,597✔
66
  }
67
}
68

69
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
2,147,483,647✔
70
  const STraceId *trace = &pMsg->info.traceId;
2,147,483,647✔
71

72
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
2,147,483,647✔
73
  if (msgFp == NULL) {
2,147,483,647✔
74
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
75
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
76
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
77
  }
78

79
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
2,147,483,647✔
80
  pMsg->info.wrapper = pWrapper;
2,147,483,647✔
81
  return (*msgFp)(pWrapper->pMgmt, pMsg);
2,147,483,647✔
82
}
83

84
static bool dmFailFastFp(tmsg_t msgType) {
×
85
  // add more msg type later
86
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
87
}
88

89
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
14,289,352✔
90
  if (code != TSDB_CODE_APP_IS_STOPPING) {
14,289,352✔
91
    return code;
5,366,124✔
92
  }
93
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
8,923,228✔
94
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
3,686,656✔
95
    code = TSDB_CODE_VND_STOPPED;
2,360,000✔
96
  }
97
  return code;
8,923,228✔
98
}
99
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
426✔
100
  int32_t        code = 0;
426✔
101
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
426✔
102
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
426✔
103
  if (code < 0) {
426✔
104
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
105
    return;
×
106
  }
107
  code = rpcSetIpWhite(pTrans, &ipWhite);
426✔
108
  pData->ipWhiteVer = ipWhite.ver;
426✔
109

110
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
426✔
111

112
  rpcFreeCont(pRpc->pCont);
426✔
113
}
114

115
static void dmUpdateRpcIpWhiteUnused(SDnodeData *pDnode, void *pTrans, SRpcMsg *pRpc) {
×
116
  int32_t code = TSDB_CODE_INVALID_MSG;
×
117
  dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
118
  rpcFreeCont(pRpc->pCont);
×
119
  pRpc->pCont = NULL;
×
120
  return;
×
121
}
122
static int32_t dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
2,147,483,647✔
123
  if (IP_FORBIDDEN_CHECK_WHITE_LIST(forbidden)) {
2,147,483,647✔
124
    dError("User:%s host:%s not in ip white list or in block white list", user, IP_ADDR_STR(clientIp));
×
125
    return TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
126

127
  } else if (IP_FORBIDDEN_CHECK_DATA_TIME_WHITE_LIST(forbidden)) {
2,147,483,647✔
128
    dError("User:%s host:%s already expired", user, IP_ADDR_STR(clientIp));
×
129
    return TSDB_CODE_MND_USER_DISABLED;
×
130
  } else {
131
    return 0;
2,147,483,647✔
132
  }
133
}
134

135
static void dmUpdateRpcTimeWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
426✔
136
  int32_t        code = 0;
426✔
137
  SRetrieveDateTimeWhiteListRsp timeWhite = {0};
426✔
138
  code = tDeserializeSRetrieveDateTimeWhiteListRsp(pRpc->pCont, pRpc->contLen, &timeWhite);
426✔
139
  if (code < 0) {
426✔
140
    dError("failed to update rpc datetime-white since: %s", tstrerror(code));
×
141
    return;
×
142
  }
143
  // TODO: implement rpcSetTimeWhite
144
  code = rpcSetTimeIpWhite(pTrans, &timeWhite);
426✔
145
  pData->timeWhiteVer = timeWhite.ver;
426✔
146

147
  (void)tFreeSRetrieveDateTimeWhiteListRsp(&timeWhite);
426✔
148

149
  rpcFreeCont(pRpc->pCont);
426✔
150
}
151

152
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
153
  SRetrieveAnalyticAlgoRsp rsp = {0};
×
154
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
155
    taosAnalyUpdate(rsp.ver, rsp.hash);
×
156
    rsp.hash = NULL;
×
157
  }
158
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
×
159
  rpcFreeCont(pRpc->pCont);
×
160
}
×
161

162
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
2,147,483,647✔
163
  SDnodeTrans  *pTrans = &pDnode->trans;
2,147,483,647✔
164
  int32_t       code = -1;
2,147,483,647✔
165
  SRpcMsg      *pMsg = NULL;
2,147,483,647✔
166
  SMgmtWrapper *pWrapper = NULL;
2,147,483,647✔
167
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
2,147,483,647✔
168

169
  const STraceId *trace = &pRpc->info.traceId;
2,147,483,647✔
170
  dGDebug("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64 " %" PRIx64 ":%" PRIx64, TMSG_INFO(pRpc->msgType),
2,147,483,647✔
171
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId, TRACE_GET_ROOTID(trace), TRACE_GET_MSGID(trace));
172

173
  int32_t svrVer = 0;
2,147,483,647✔
174
  code = taosVersionStrToInt(td_version, &svrVer);
2,147,483,647✔
175
  if (code != 0) {
2,147,483,647✔
176
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
177
    goto _OVER;
×
178
  }
179
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
2,147,483,647✔
180
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
181
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
182
    goto _OVER;
×
183
  }
184

185
  code = dmIsForbiddenIp(pRpc->info.forbiddenIp, RPC_MSG_USER(pRpc), &pRpc->info.conn.cliAddr);
2,147,483,647✔
186
  if (code != 0) {
2,147,483,647✔
187
    goto _OVER;
×
188
  }
189

190
  switch (pRpc->msgType) {
2,147,483,647✔
191
    case TDMT_DND_NET_TEST:
×
192
      dmProcessNetTestReq(pDnode, pRpc);
×
193
      return;
1,129,667✔
194
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
290,490,759✔
195
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
196
    case TDMT_SCH_FETCH_RSP:
197
    case TDMT_SCH_MERGE_FETCH_RSP:
198
    case TDMT_VND_SUBMIT_RSP:
199
    case TDMT_MND_GET_DB_INFO_RSP:
200
    case TDMT_VND_TABLE_META_RSP:
201
    case TDMT_STREAM_FETCH_RSP:
202
    case TDMT_STREAM_FETCH_FROM_RUNNER_RSP:
203
    case TDMT_STREAM_FETCH_FROM_CACHE_RSP:
204
    case TDMT_VND_SNODE_DROP_TABLE_RSP:
205
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
290,490,759✔
206
      return;
290,521,762✔
207
    case TDMT_MND_STATUS_RSP:
×
208
      if (pEpSet != NULL) {
×
209
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
210
      }
211
      break;
×
212
    case TDMT_MND_RETRIEVE_IP_WHITELIST_RSP:
×
213
      dmUpdateRpcIpWhiteUnused(&pDnode->data, pTrans->serverRpc, pRpc);
×
214
      return;
×
215
    case TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL_RSP:
426✔
216
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
426✔
217
      return;
426✔
218
    case TDMT_MND_RETRIEVE_DATETIME_WHITELIST_RSP:
426✔
219
      dmUpdateRpcTimeWhite(&pDnode->data, pTrans->serverRpc, pRpc);
426✔
220
      return;
426✔
221
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
1,421✔
222
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
1,421✔
223
      return;
×
224
    default:
2,147,483,647✔
225
      break;
2,147,483,647✔
226
  }
227

228
  /*
229
  pDnode is null, TD-22618
230
  at trans.c line 91
231
  before this line, dmProcessRpcMsg callback is set
232
  after this line, parent is set
233
  so when dmProcessRpcMsg is called, pDonde is still null.
234
  */
235
  if (pDnode != NULL) {
2,147,483,647✔
236
    if (pDnode->status != DND_STAT_RUNNING) {
2,147,483,647✔
237
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
11,814,658✔
238
        dmProcessServerStartupStatus(pDnode, pRpc);
×
239
        return;
×
240
      } else {
241
        if (pDnode->status == DND_STAT_INIT) {
11,814,658✔
242
          code = TSDB_CODE_APP_IS_STARTING;
2,891,736✔
243
        } else {
244
          code = TSDB_CODE_APP_IS_STOPPING;
8,923,228✔
245
        }
246
        goto _OVER;
11,814,964✔
247
      }
248
    }
249
  } else {
250
    code = TSDB_CODE_APP_IS_STARTING;
×
251
    goto _OVER;
×
252
  }
253

254
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
2,147,483,647✔
255
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
256
    code = TSDB_CODE_INVALID_MSG_LEN;
×
257
    goto _OVER;
×
258
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
2,147,483,647✔
259
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
1,138,202✔
260
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
23,961✔
261
  }
262

263
  if (pHandle->defaultNtype == NODE_END) {
2,147,483,647✔
264
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
265
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
266
    goto _OVER;
×
267
  }
268

269
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
2,147,483,647✔
270
  if (pHandle->needCheckVgId) {
2,147,483,647✔
271
    if (pRpc->contLen > 0) {
1,660,921,464✔
272
      const SMsgHead *pHead = pRpc->pCont;
1,661,106,444✔
273
      const int32_t   vgId = ntohl(pHead->vgId);
1,661,082,934✔
274
      switch (vgId) {
1,660,866,166✔
275
        case QNODE_HANDLE:
3,393,774✔
276
          pWrapper = &pDnode->wrappers[QNODE];
3,393,774✔
277
          break;
3,393,774✔
278
        case SNODE_HANDLE:
2,875,524✔
279
          pWrapper = &pDnode->wrappers[SNODE];
2,875,524✔
280
          break;
2,875,524✔
281
        case MNODE_HANDLE:
31,441,091✔
282
          pWrapper = &pDnode->wrappers[MNODE];
31,441,091✔
283
          break;
31,483,557✔
284
        default:
1,623,155,777✔
285
          break;
1,623,155,777✔
286
      }
287
    } else {
288
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
289
      code = TSDB_CODE_INVALID_MSG_LEN;
×
290
      goto _OVER;
×
291
    }
292
  }
293

294
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
2,147,483,647✔
295
    pWrapper = NULL;
50,113✔
296
    goto _OVER;
50,113✔
297
  }
298

299
  pRpc->info.wrapper = pWrapper;
2,147,483,647✔
300

301
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
2,147,483,647✔
302
  if (IsReq(pRpc)) {
2,147,483,647✔
303
    if (pRpc->msgType == TDMT_SYNC_HEARTBEAT || pRpc->msgType == TDMT_SYNC_HEARTBEAT_REPLY)
2,147,483,647✔
304
      itype = DEF_QITEM;
48,862,731✔
305
    else
306
      itype = RPC_QITEM;
2,147,483,647✔
307
  } else {
308
    itype = DEF_QITEM;
88,217,584✔
309
  }
310
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
2,147,483,647✔
311
  if (code) goto _OVER;
2,147,483,647✔
312

313
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
2,147,483,647✔
314
  dGDebug("msg:%p, is created, type:%s handle:%p len:%d %" PRIx64 ":%" PRIx64, pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
2,147,483,647✔
315
          pRpc->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
316

317
  code = dmProcessNodeMsg(pWrapper, pMsg);
2,147,483,647✔
318

319
_OVER:
2,147,483,647✔
320
  if (code != 0) {
2,147,483,647✔
321
    code = dmConvertErrCode(pRpc->msgType, code);
14,289,352✔
322
    if (pMsg) {
14,289,046✔
323
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
2,424,275✔
324
    } else {
325
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
11,864,771✔
326
    }
327

328
    if (IsReq(pRpc)) {
14,289,046✔
329
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
14,276,414✔
330
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
14,276,368✔
331
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
49,597✔
332
      }
333

334
      if (pWrapper != NULL) {
14,276,806✔
335
        dmSendRsp(&rsp);
2,424,169✔
336
      } else {
337
        if (rpcSendResponse(&rsp) != 0) {
11,852,637✔
338
          dError("failed to send response, msg:%p", &rsp);
×
339
        }
340
      }
341
    } else if (NULL == pMsg && IS_STREAM_TRIGGER_RSP_MSG(pRpc->msgType)) {
12,549✔
342
      destroyAhandle(pRpc->info.ahandle);
7,361✔
343
      dDebug("msg:%s ahandle freed", TMSG_INFO(pRpc->msgType));
6,703✔
344
    }
345

346
    if (pMsg != NULL) {
14,287,858✔
347
      dGTrace("msg:%p, is freed", pMsg);
2,423,972✔
348
      taosFreeQitem(pMsg);
2,423,972✔
349
    }
350
    rpcFreeCont(pRpc->pCont);
14,287,875✔
351
    pRpc->pCont = NULL;
14,286,521✔
352
  }
353

354
  dmReleaseWrapper(pWrapper);
2,147,483,647✔
355
}
356

357
int32_t dmInitMsgHandle(SDnode *pDnode) {
628,699✔
358
  SDnodeTrans *pTrans = &pDnode->trans;
628,699✔
359

360
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
5,029,592✔
361
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
4,400,893✔
362
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
4,400,893✔
363
    if (pArray == NULL) return -1;
4,400,893✔
364

365
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
247,078,707✔
366
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
242,677,814✔
367
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
242,677,814✔
368
      if (pMgmt->needCheckVgId) {
242,677,814✔
369
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
24,519,261✔
370
      }
371
      if (!pMgmt->needCheckVgId) {
242,677,814✔
372
        pHandle->defaultNtype = ntype;
218,158,553✔
373
      }
374
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
242,677,814✔
375
    }
376

377
    taosArrayDestroy(pArray);
4,400,893✔
378
  }
379

380
  return 0;
628,699✔
381
}
382

383
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
169,910,029✔
384
  int32_t code = 0;
169,910,029✔
385
  SDnode *pDnode = dmInstance();
169,910,029✔
386
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
169,908,568✔
387
    rpcFreeCont(pMsg->pCont);
922,331✔
388
    pMsg->pCont = NULL;
922,331✔
389
    if (pDnode->status == DND_STAT_INIT) {
922,331✔
390
      code = TSDB_CODE_APP_IS_STARTING;
1,576✔
391
    } else {
392
      code = TSDB_CODE_APP_IS_STOPPING;
920,755✔
393
    }
394
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
922,331✔
395
           pMsg->info.handle);
396
    return code;
922,331✔
397
  } else {
398
    pMsg->info.handle = 0;
168,987,605✔
399
    code = rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
168,987,490✔
400
    if (code != 0) {
168,989,683✔
401
      dError("failed to send rpc msg");
×
402
      return code;
×
403
    }
404
    return 0;
168,989,683✔
405
  }
406
}
407
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
249,579,451✔
408
  int32_t code = 0;
249,579,451✔
409
  SDnode *pDnode = dmInstance();
249,579,451✔
410
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
249,575,050✔
411
    rpcFreeCont(pMsg->pCont);
×
412
    pMsg->pCont = NULL;
×
413
    if (pDnode->status == DND_STAT_INIT) {
×
414
      code = TSDB_CODE_APP_IS_STARTING;
×
415
    } else {
416
      code = TSDB_CODE_APP_IS_STOPPING;
×
417
    }
418
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
419
           pMsg->info.handle);
420
    return code;
×
421
  } else {
422
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
249,582,433✔
423
  }
424
}
425

426
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
547,075,617✔
427

428
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
545,029,968✔
429
  (void)rpcReleaseHandle(pHandle, type, status);
545,029,968✔
430
}
545,056,162✔
431

432
static bool rpcRfp(int32_t code, tmsg_t msgType) {
45,357,048✔
433
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
45,357,048✔
434
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
42,718,977✔
435
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
29,128,259✔
436
      code == TSDB_CODE_APP_IS_STOPPING) {
437
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
17,585,993✔
438
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
17,586,820✔
UNCOV
439
      return false;
×
440
    }
441
    return true;
17,586,820✔
442
  } else {
443
    return false;
27,771,055✔
444
  }
445
}
446
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
447
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_QUERY_SSMIGRATE_PROGRESS ||
×
448
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE ||
×
449
      msgType == TDMT_VND_QUERY_SCAN_PROGRESS || msgType == TDMT_VND_QUERY_TRIM_PROGRESS) {
×
450
    return true;
×
451
  }
452
  return false;
×
453
}
454
int32_t dmInitClient(SDnode *pDnode) {
628,671✔
455
  SDnodeTrans *pTrans = &pDnode->trans;
628,671✔
456

457
  SRpcInit rpcInit = {0};
628,671✔
458
  rpcInit.label = "DNODE-CLI";
628,671✔
459
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
628,671✔
460
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
628,671✔
461
  rpcInit.sessions = 1024;
628,671✔
462
  rpcInit.connType = TAOS_CONN_CLIENT;
628,671✔
463
  rpcInit.user = TSDB_DEFAULT_USER;
628,671✔
464
  rpcInit.idleTime = tsShellActivityTimer * 1000;
628,671✔
465
  rpcInit.parent = pDnode;
628,671✔
466
  rpcInit.rfp = rpcRfp;
628,671✔
467
  rpcInit.compressSize = tsCompressMsgSize;
628,671✔
468
  rpcInit.dfp = destroyAhandle;
628,671✔
469

470
  rpcInit.retryMinInterval = tsRedirectPeriod;
628,671✔
471
  rpcInit.retryStepFactor = tsRedirectFactor;
628,671✔
472
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
628,671✔
473
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
628,671✔
474

475
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
628,671✔
476
  rpcInit.failFastThreshold = 3;    // failed threshold
628,671✔
477
  rpcInit.ffp = dmFailFastFp;
628,671✔
478

479
  rpcInit.noDelayFp = rpcNoDelayMsg;
628,671✔
480

481
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
628,671✔
482
  connLimitNum = TMAX(connLimitNum, 10);
628,671✔
483
  connLimitNum = TMIN(connLimitNum, 500);
628,671✔
484

485
  rpcInit.connLimitNum = connLimitNum;
628,671✔
486
  rpcInit.connLimitLock = 1;
628,671✔
487
  rpcInit.supportBatch = 1;
628,671✔
488
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
628,671✔
489
  rpcInit.shareConn = 1;
628,671✔
490
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
628,671✔
491
  rpcInit.notWaitAvaliableConn = 0;
628,671✔
492
  rpcInit.startReadTimer = 1;
628,671✔
493
  rpcInit.readTimeout = tsReadTimeout;
628,671✔
494
  rpcInit.ipv6 = tsEnableIpv6;
628,671✔
495
  rpcInit.enableSSL = tsEnableTLS;
628,671✔
496
  rpcInit.enableSasl = tsEnableSasl;
628,671✔
497

498
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
628,671✔
499
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
628,671✔
500
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
628,671✔
501
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
628,671✔
502
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
628,671✔
503

504
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
628,671✔
505
    dError("failed to convert version string:%s to int", td_version);
×
506
  }
507

508
  pTrans->clientRpc = rpcOpen(&rpcInit);
628,671✔
509
  if (pTrans->clientRpc == NULL) {
628,671✔
510
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
511
    return terrno;
×
512
  }
513

514
  dDebug("dnode rpc client is initialized");
628,671✔
515
  return 0;
628,671✔
516
}
517
int32_t dmInitStatusClient(SDnode *pDnode) {
628,671✔
518
  SDnodeTrans *pTrans = &pDnode->trans;
628,671✔
519

520
  SRpcInit rpcInit = {0};
628,671✔
521
  rpcInit.label = "DNODE-STA-CLI";
628,671✔
522
  rpcInit.numOfThreads = 1;
628,671✔
523
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
628,671✔
524
  rpcInit.sessions = 1024;
628,671✔
525
  rpcInit.connType = TAOS_CONN_CLIENT;
628,671✔
526
  rpcInit.user = TSDB_DEFAULT_USER;
628,671✔
527
  rpcInit.idleTime = tsShellActivityTimer * 1000;
628,671✔
528
  rpcInit.parent = pDnode;
628,671✔
529
  rpcInit.rfp = rpcRfp;
628,671✔
530
  rpcInit.compressSize = tsCompressMsgSize;
628,671✔
531

532
  rpcInit.retryMinInterval = tsRedirectPeriod;
628,671✔
533
  rpcInit.retryStepFactor = tsRedirectFactor;
628,671✔
534
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
628,671✔
535
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
628,671✔
536

537
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
628,671✔
538
  rpcInit.failFastThreshold = 3;    // failed threshold
628,671✔
539
  rpcInit.ffp = dmFailFastFp;
628,671✔
540

541
  int32_t connLimitNum = 100;
628,671✔
542
  connLimitNum = TMAX(connLimitNum, 10);
628,671✔
543
  connLimitNum = TMIN(connLimitNum, 500);
628,671✔
544

545
  rpcInit.connLimitNum = connLimitNum;
628,671✔
546
  rpcInit.connLimitLock = 1;
628,671✔
547
  rpcInit.supportBatch = 1;
628,671✔
548
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
628,671✔
549
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
628,671✔
550
  rpcInit.startReadTimer = 0;
628,671✔
551
  rpcInit.readTimeout = 0;
628,671✔
552
  rpcInit.ipv6 = tsEnableIpv6;
628,671✔
553
  rpcInit.enableSasl = tsEnableSasl;
628,671✔
554

555
  rpcInit.enableSSL = tsEnableTLS;
628,671✔
556
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
628,671✔
557
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
628,671✔
558
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
628,671✔
559
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
628,671✔
560
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
628,671✔
561

562
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
628,671✔
563
    dError("failed to convert version string:%s to int", td_version);
×
564
  }
565

566
  pTrans->statusRpc = rpcOpen(&rpcInit);
628,671✔
567
  if (pTrans->statusRpc == NULL) {
628,671✔
568
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
569
    return terrno;
×
570
  }
571

572
  dDebug("dnode rpc status client is initialized");
628,671✔
573
  return 0;
628,671✔
574
}
575

576
int32_t dmInitSyncClient(SDnode *pDnode) {
628,671✔
577
  SDnodeTrans *pTrans = &pDnode->trans;
628,671✔
578

579
  SRpcInit rpcInit = {0};
628,671✔
580
  rpcInit.label = "DNODE-SYNC-CLI";
628,671✔
581
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
628,671✔
582
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
628,671✔
583
  rpcInit.sessions = 1024;
628,671✔
584
  rpcInit.connType = TAOS_CONN_CLIENT;
628,671✔
585
  rpcInit.user = TSDB_DEFAULT_USER;
628,671✔
586
  rpcInit.idleTime = tsShellActivityTimer * 1000;
628,671✔
587
  rpcInit.parent = pDnode;
628,671✔
588
  rpcInit.rfp = rpcRfp;
628,671✔
589
  rpcInit.compressSize = tsCompressMsgSize;
628,671✔
590

591
  rpcInit.retryMinInterval = tsRedirectPeriod;
628,671✔
592
  rpcInit.retryStepFactor = tsRedirectFactor;
628,671✔
593
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
628,671✔
594
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
628,671✔
595

596
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
628,671✔
597
  rpcInit.failFastThreshold = 3;    // failed threshold
628,671✔
598
  rpcInit.ffp = dmFailFastFp;
628,671✔
599

600
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
628,671✔
601
  connLimitNum = TMAX(connLimitNum, 10);
628,671✔
602
  connLimitNum = TMIN(connLimitNum, 500);
628,671✔
603

604
  rpcInit.connLimitNum = connLimitNum;
628,671✔
605
  rpcInit.connLimitLock = 1;
628,671✔
606
  rpcInit.supportBatch = 1;
628,671✔
607
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
628,671✔
608
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
628,671✔
609
  rpcInit.startReadTimer = 1;
628,671✔
610
  rpcInit.readTimeout = tsReadTimeout;
628,671✔
611
  rpcInit.ipv6 = tsEnableIpv6;
628,671✔
612
  rpcInit.enableSSL = tsEnableTLS;
628,671✔
613
  rpcInit.enableSasl = tsEnableSasl;
628,671✔
614

615
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
628,671✔
616
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
628,671✔
617
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
628,671✔
618
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
628,671✔
619
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
628,671✔
620
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
628,671✔
621
    dError("failed to convert version string:%s to int", td_version);
×
622
  }
623

624
  pTrans->syncRpc = rpcOpen(&rpcInit);
628,671✔
625
  if (pTrans->syncRpc == NULL) {
628,671✔
626
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
627
    return terrno;
×
628
  }
629

630
  dDebug("dnode rpc sync client is initialized");
628,671✔
631
  return 0;
628,671✔
632
}
633

634
void dmCleanupClient(SDnode *pDnode) {
628,671✔
635
  SDnodeTrans *pTrans = &pDnode->trans;
628,671✔
636
  if (pTrans->clientRpc) {
628,671✔
637
    rpcClose(pTrans->clientRpc);
628,671✔
638
    pTrans->clientRpc = NULL;
628,671✔
639
    dDebug("dnode rpc client is closed");
628,671✔
640
  }
641
}
628,671✔
642
void dmCleanupStatusClient(SDnode *pDnode) {
628,671✔
643
  SDnodeTrans *pTrans = &pDnode->trans;
628,671✔
644
  if (pTrans->statusRpc) {
628,671✔
645
    rpcClose(pTrans->statusRpc);
628,671✔
646
    pTrans->statusRpc = NULL;
628,671✔
647
    dDebug("dnode rpc status client is closed");
628,671✔
648
  }
649
}
628,671✔
650
void dmCleanupSyncClient(SDnode *pDnode) {
628,671✔
651
  SDnodeTrans *pTrans = &pDnode->trans;
628,671✔
652
  if (pTrans->syncRpc) {
628,671✔
653
    rpcClose(pTrans->syncRpc);
628,671✔
654
    pTrans->syncRpc = NULL;
628,671✔
655
    dDebug("dnode rpc sync client is closed");
628,671✔
656
  }
657
}
628,671✔
658

659
int32_t dmInitServer(SDnode *pDnode) {
628,699✔
660
  int32_t      code = 0;
628,699✔
661
  SDnodeTrans *pTrans = &pDnode->trans;
628,699✔
662

663
  SRpcInit rpcInit = {0};
628,699✔
664
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
628,699✔
665

666
  rpcInit.localPort = tsServerPort;
628,699✔
667
  rpcInit.label = "DND-S";
628,699✔
668
  rpcInit.numOfThreads = tsNumOfRpcThreads;
628,699✔
669
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
628,699✔
670
  rpcInit.sessions = tsMaxShellConns;
628,699✔
671
  rpcInit.connType = TAOS_CONN_SERVER;
628,699✔
672
  rpcInit.idleTime = tsShellActivityTimer * 1000;
628,699✔
673
  rpcInit.parent = pDnode;
628,699✔
674
  rpcInit.compressSize = tsCompressMsgSize;
628,699✔
675
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
628,699✔
676
  rpcInit.ipv6 = tsEnableIpv6;
628,699✔
677
  rpcInit.enableSSL = tsEnableTLS;
628,699✔
678
  rpcInit.enableSasl = tsEnableSasl;
628,699✔
679

680
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
628,699✔
681
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
628,699✔
682
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
628,699✔
683
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
628,699✔
684
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
628,699✔
685

686
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
628,699✔
687
    dError("failed to convert version string:%s to int", td_version);
×
688
  }
689

690
  pTrans->serverRpc = rpcOpen(&rpcInit);
628,699✔
691
  if (pTrans->serverRpc == NULL) {
628,699✔
692
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
28✔
693
    return terrno;
28✔
694
  }
695

696
  dDebug("dnode rpc server is initialized");
628,671✔
697
  return 0;
628,671✔
698
}
699

700
void dmCleanupServer(SDnode *pDnode) {
628,671✔
701
  SDnodeTrans *pTrans = &pDnode->trans;
628,671✔
702
  if (pTrans->serverRpc) {
628,671✔
703
    rpcClose(pTrans->serverRpc);
628,671✔
704
    pTrans->serverRpc = NULL;
628,671✔
705
    dDebug("dnode rpc server is closed");
628,671✔
706
  }
707
}
628,671✔
708

709
SMsgCb dmGetMsgcb(SDnode *pDnode) {
7,119,239✔
710
  SMsgCb msgCb = {
42,653,824✔
711
      .clientRpc = pDnode->trans.clientRpc,
7,119,239✔
712
      .serverRpc = pDnode->trans.serverRpc,
7,119,239✔
713
      .statusRpc = pDnode->trans.statusRpc,
7,119,239✔
714
      .syncRpc = pDnode->trans.syncRpc,
7,119,239✔
715
      .sendReqFp = dmSendReq,
716
      .sendSyncReqFp = dmSendSyncReq,
717
      .sendRspFp = dmSendRsp,
718
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
719
      .releaseHandleFp = dmReleaseHandle,
720
      .reportStartupFp = dmReportStartup,
721
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
722
      .getDnodeEpFp = dmGetDnodeEp,
723
      .data = &pDnode->data,
7,119,239✔
724
  };
725
  return msgCb;
7,119,239✔
726
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc