• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4897

25 Dec 2025 10:17AM UTC coverage: 65.717% (-0.2%) from 65.929%
#4897

push

travis-ci

web-flow
fix: [6622889291] Fix invalid rowSize. (#34043)

186011 of 283047 relevant lines covered (65.72%)

113853896.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.61
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21
#define IS_STREAM_TRIGGER_RSP_MSG(_msg) (TDMT_STREAM_TRIGGER_CALC_RSP == (_msg) || TDMT_STREAM_TRIGGER_PULL_RSP == (_msg) || TDMT_STREAM_TRIGGER_DROP_RSP == (_msg))
22

23
static inline void dmSendRsp(SRpcMsg *pMsg) {
717,602✔
24
  if (rpcSendResponse(pMsg) != 0) {
717,602✔
25
    dError("failed to send response, msg:%p", pMsg);
×
26
  }
27
}
717,602✔
28

29
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
100,318✔
30
  SEpSet epSet = {0};
100,318✔
31
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
100,318✔
32

33
  if (epSet.numOfEps <= 1) {
100,323✔
34
    if (epSet.numOfEps == 0) {
42,008✔
35
      pMsg->pCont = NULL;
×
36
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
37
      return;
×
38
    }
39
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
40
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
41
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
42,008✔
42
      pMsg->pCont = NULL;
×
43
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
44
      return;
×
45
    }
46
  }
47

48
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
100,323✔
49
  pMsg->pCont = rpcMallocCont(contLen);
99,413✔
50
  if (pMsg->pCont == NULL) {
101,532✔
51
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
52
  } else {
53
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
101,532✔
54
    if (contLen < 0) {
101,532✔
55
      pMsg->code = contLen;
×
56
      return;
×
57
    }
58
    pMsg->contLen = contLen;
101,532✔
59
  }
60
}
61

62
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
2,122,475,459✔
63
  const STraceId *trace = &pMsg->info.traceId;
2,122,475,459✔
64

65
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
2,122,657,377✔
66
  if (msgFp == NULL) {
2,122,411,341✔
67
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
68
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
69
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
70
  }
71

72
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
2,122,411,341✔
73
  pMsg->info.wrapper = pWrapper;
2,122,411,341✔
74
  return (*msgFp)(pWrapper->pMgmt, pMsg);
2,122,572,220✔
75
}
76

77
static bool dmFailFastFp(tmsg_t msgType) {
×
78
  // add more msg type later
79
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
80
}
81

82
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
11,540,650✔
83
  if (code != TSDB_CODE_APP_IS_STOPPING) {
11,540,650✔
84
    return code;
2,599,131✔
85
  }
86
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
8,941,519✔
87
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
4,064,333✔
88
    code = TSDB_CODE_VND_STOPPED;
2,455,114✔
89
  }
90
  return code;
8,941,519✔
91
}
92
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
1,781✔
93
  int32_t        code = 0;
1,781✔
94
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
1,781✔
95
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
1,781✔
96
  if (code < 0) {
1,781✔
97
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
98
    return;
×
99
  }
100
  code = rpcSetIpWhite(pTrans, &ipWhite);
1,781✔
101
  pData->ipWhiteVer = ipWhite.ver;
1,781✔
102

103
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
1,781✔
104

105
  rpcFreeCont(pRpc->pCont);
1,781✔
106
}
107

108
static void dmUpdateRpcIpWhiteUnused(SDnodeData *pDnode, void *pTrans, SRpcMsg *pRpc) {
×
109
  int32_t code = TSDB_CODE_INVALID_MSG;
×
110
  dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
111
  rpcFreeCont(pRpc->pCont);
×
112
  pRpc->pCont = NULL;
×
113
  return;
×
114
}
115
static int32_t dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
2,147,483,647✔
116
  if (IP_FORBIDDEN_CHECK_WHITE_LIST(forbidden)) {
2,147,483,647✔
117
    dError("User:%s host:%s not in ip white list or in block white list", user, IP_ADDR_STR(clientIp));
×
118
    return TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
119

120
  } else if (IP_FORBIDDEN_CHECK_DATA_TIME_WHITE_LIST(forbidden)) {
2,147,483,647✔
121
    dError("User:%s host:%s alread expired", user, IP_ADDR_STR(clientIp));
×
122
    return TSDB_CODE_MND_USER_DISABLED;
×
123
  } else {
124
    return 0;
2,147,483,647✔
125
  }
126
}
127

128
static void dmUpdateRpcTimeWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
1,781✔
129
  int32_t        code = 0;
1,781✔
130
  SRetrieveDateTimeWhiteListRsp timeWhite = {0};
1,781✔
131
  code = tDeserializeSRetrieveDateTimeWhiteListRsp(pRpc->pCont, pRpc->contLen, &timeWhite);
1,781✔
132
  if (code < 0) {
1,781✔
133
    dError("failed to update rpc datetime-white since: %s", tstrerror(code));
×
134
    return;
×
135
  }
136
  // TODO: implement rpcSetTimeWhite
137
  code = rpcSetTimeIpWhite(pTrans, &timeWhite);
1,781✔
138
  pData->timeWhiteVer = timeWhite.ver;
1,781✔
139

140
  (void)tFreeSRetrieveDateTimeWhiteListRsp(&timeWhite);
1,781✔
141

142
  rpcFreeCont(pRpc->pCont);
1,781✔
143
}
144

145
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
146
  SRetrieveAnalyticAlgoRsp rsp = {0};
×
147
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
148
    taosAnalyUpdate(rsp.ver, rsp.hash);
×
149
    rsp.hash = NULL;
×
150
  }
151
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
×
152
  rpcFreeCont(pRpc->pCont);
×
153
}
×
154

155
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
2,147,483,647✔
156
  SDnodeTrans  *pTrans = &pDnode->trans;
2,147,483,647✔
157
  int32_t       code = -1;
2,147,483,647✔
158
  SRpcMsg      *pMsg = NULL;
2,147,483,647✔
159
  SMgmtWrapper *pWrapper = NULL;
2,147,483,647✔
160
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
2,147,483,647✔
161

162
  const STraceId *trace = &pRpc->info.traceId;
2,147,483,647✔
163
  dGDebug("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64 " %" PRIx64 ":%" PRIx64, TMSG_INFO(pRpc->msgType),
2,147,483,647✔
164
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId, TRACE_GET_ROOTID(trace), TRACE_GET_MSGID(trace));
165

166
  int32_t svrVer = 0;
2,147,483,647✔
167
  code = taosVersionStrToInt(td_version, &svrVer);
2,147,483,647✔
168
  if (code != 0) {
2,147,483,647✔
169
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
170
    goto _OVER;
×
171
  }
172
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
2,147,483,647✔
173
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
174
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
175
    goto _OVER;
×
176
  }
177

178
  code = dmIsForbiddenIp(pRpc->info.forbiddenIp, RPC_MSG_USER(pRpc), &pRpc->info.conn.cliAddr);
2,147,483,647✔
179
  if (code != 0) {
2,147,483,647✔
180
    goto _OVER;
×
181
  }
182

183
  switch (pRpc->msgType) {
2,147,483,647✔
184
    case TDMT_DND_NET_TEST:
×
185
      dmProcessNetTestReq(pDnode, pRpc);
×
186
      return;
10,981✔
187
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
177,890,854✔
188
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
189
    case TDMT_SCH_FETCH_RSP:
190
    case TDMT_SCH_MERGE_FETCH_RSP:
191
    case TDMT_VND_SUBMIT_RSP:
192
    case TDMT_MND_GET_DB_INFO_RSP:
193
    case TDMT_STREAM_FETCH_RSP:
194
    case TDMT_STREAM_FETCH_FROM_RUNNER_RSP:
195
    case TDMT_STREAM_FETCH_FROM_CACHE_RSP:
196
    case TDMT_VND_SNODE_DROP_TABLE_RSP:
197
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
177,890,854✔
198
      return;
177,906,301✔
199
    case TDMT_MND_STATUS_RSP:
×
200
      if (pEpSet != NULL) {
×
201
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
202
      }
203
      break;
×
204
    case TDMT_MND_RETRIEVE_IP_WHITELIST_RSP:
×
205
      dmUpdateRpcIpWhiteUnused(&pDnode->data, pTrans->serverRpc, pRpc);
×
206
      return;
×
207
    case TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL_RSP:
1,781✔
208
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
1,781✔
209
      return;
1,781✔
210
    case TDMT_MND_RETRIEVE_DATETIME_WHITELIST_RSP:
1,781✔
211
      dmUpdateRpcTimeWhite(&pDnode->data, pTrans->serverRpc, pRpc);
1,781✔
212
      return;
1,781✔
213
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
140,518✔
214
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
140,518✔
215
      return;
×
216
    default:
2,133,277,479✔
217
      break;
2,133,277,479✔
218
  }
219

220
  /*
221
  pDnode is null, TD-22618
222
  at trans.c line 91
223
  before this line, dmProcessRpcMsg callback is set
224
  after this line, parent is set
225
  so when dmProcessRpcMsg is called, pDonde is still null.
226
  */
227
  if (pDnode != NULL) {
2,133,277,479✔
228
    if (pDnode->status != DND_STAT_RUNNING) {
2,133,282,159✔
229
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
10,720,777✔
230
        dmProcessServerStartupStatus(pDnode, pRpc);
×
231
        return;
×
232
      } else {
233
        if (pDnode->status == DND_STAT_INIT) {
10,720,777✔
234
          code = TSDB_CODE_APP_IS_STARTING;
1,779,258✔
235
        } else {
236
          code = TSDB_CODE_APP_IS_STOPPING;
8,941,519✔
237
        }
238
        goto _OVER;
10,720,777✔
239
      }
240
    }
241
  } else {
242
    code = TSDB_CODE_APP_IS_STARTING;
194✔
243
    goto _OVER;
194✔
244
  }
245

246
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
2,122,471,518✔
247
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
248
    code = TSDB_CODE_INVALID_MSG_LEN;
×
249
    goto _OVER;
×
250
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
2,122,617,379✔
251
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
904,136✔
252
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
44,028✔
253
  }
254

255
  if (pHandle->defaultNtype == NODE_END) {
2,122,704,377✔
256
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
257
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
258
    goto _OVER;
×
259
  }
260

261
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
2,122,556,739✔
262
  if (pHandle->needCheckVgId) {
2,122,776,723✔
263
    if (pRpc->contLen > 0) {
1,110,929,226✔
264
      const SMsgHead *pHead = pRpc->pCont;
1,110,941,679✔
265
      const int32_t   vgId = ntohl(pHead->vgId);
1,110,926,331✔
266
      switch (vgId) {
1,110,769,745✔
267
        case QNODE_HANDLE:
×
268
          pWrapper = &pDnode->wrappers[QNODE];
×
269
          break;
×
270
        case SNODE_HANDLE:
3,005,891✔
271
          pWrapper = &pDnode->wrappers[SNODE];
3,005,891✔
272
          break;
3,005,891✔
273
        case MNODE_HANDLE:
42,522,050✔
274
          pWrapper = &pDnode->wrappers[MNODE];
42,522,050✔
275
          break;
42,552,813✔
276
        default:
1,065,241,804✔
277
          break;
1,065,241,804✔
278
      }
279
    } else {
280
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
430✔
281
      code = TSDB_CODE_INVALID_MSG_LEN;
430✔
282
      goto _OVER;
430✔
283
    }
284
  }
285

286
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
2,122,618,670✔
287
    pWrapper = NULL;
101,534✔
288
    goto _OVER;
101,534✔
289
  }
290

291
  pRpc->info.wrapper = pWrapper;
2,122,522,631✔
292

293
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
2,122,541,012✔
294
  if (IsReq(pRpc)) {
2,122,541,012✔
295
    if (pRpc->msgType == TDMT_SYNC_HEARTBEAT || pRpc->msgType == TDMT_SYNC_HEARTBEAT_REPLY)
2,030,860,097✔
296
      itype = DEF_QITEM;
60,613,753✔
297
    else
298
      itype = RPC_QITEM;
1,970,284,969✔
299
  } else {
300
    itype = DEF_QITEM;
91,786,392✔
301
  }
302
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
2,122,685,114✔
303
  if (code) goto _OVER;
2,122,627,940✔
304

305
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
2,122,627,940✔
306
  dGDebug("msg:%p, is created, type:%s handle:%p len:%d %" PRIx64 ":%" PRIx64, pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
2,122,627,940✔
307
          pRpc->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
308

309
  code = dmProcessNodeMsg(pWrapper, pMsg);
2,122,631,917✔
310

311
_OVER:
2,133,487,728✔
312
  if (code != 0) {
2,133,568,071✔
313
    code = dmConvertErrCode(pRpc->msgType, code);
11,540,048✔
314
    if (pMsg) {
11,539,730✔
315
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
717,347✔
316
    } else {
317
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
10,822,383✔
318
    }
319

320
    if (IsReq(pRpc)) {
11,539,730✔
321
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
11,501,235✔
322
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
11,501,887✔
323
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
101,532✔
324
      }
325

326
      if (pWrapper != NULL) {
11,501,887✔
327
        dmSendRsp(&rsp);
717,602✔
328
      } else {
329
        if (rpcSendResponse(&rsp) != 0) {
10,784,285✔
330
          dError("failed to send response, msg:%p", &rsp);
×
331
        }
332
      }
333
    } else if (NULL == pMsg && IS_STREAM_TRIGGER_RSP_MSG(pRpc->msgType)) {
38,831✔
334
      destroyAhandle(pRpc->info.ahandle);
27,790✔
335
      dDebug("msg:%s ahandle freed", TMSG_INFO(pRpc->msgType));
27,535✔
336
    }
337

338
    if (pMsg != NULL) {
11,539,853✔
339
      dGTrace("msg:%p, is freed", pMsg);
717,602✔
340
      taosFreeQitem(pMsg);
717,602✔
341
    }
342
    rpcFreeCont(pRpc->pCont);
11,539,853✔
343
    pRpc->pCont = NULL;
11,539,716✔
344
  }
345

346
  dmReleaseWrapper(pWrapper);
2,133,567,739✔
347
}
348

349
int32_t dmInitMsgHandle(SDnode *pDnode) {
707,062✔
350
  SDnodeTrans *pTrans = &pDnode->trans;
707,062✔
351

352
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
4,949,434✔
353
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
4,242,372✔
354
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
4,242,372✔
355
    if (pArray == NULL) return -1;
4,242,372✔
356

357
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
254,542,320✔
358
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
250,299,948✔
359
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
250,299,948✔
360
      if (pMgmt->needCheckVgId) {
250,299,948✔
361
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
27,575,418✔
362
      }
363
      if (!pMgmt->needCheckVgId) {
250,299,948✔
364
        pHandle->defaultNtype = ntype;
222,724,530✔
365
      }
366
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
250,299,948✔
367
    }
368

369
    taosArrayDestroy(pArray);
4,242,372✔
370
  }
371

372
  return 0;
707,062✔
373
}
374

375
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
184,323,682✔
376
  int32_t code = 0;
184,323,682✔
377
  SDnode *pDnode = dmInstance();
184,323,682✔
378
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
184,322,478✔
379
    rpcFreeCont(pMsg->pCont);
919,745✔
380
    pMsg->pCont = NULL;
919,745✔
381
    if (pDnode->status == DND_STAT_INIT) {
919,745✔
382
      code = TSDB_CODE_APP_IS_STARTING;
1,918✔
383
    } else {
384
      code = TSDB_CODE_APP_IS_STOPPING;
917,827✔
385
    }
386
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
919,745✔
387
           pMsg->info.handle);
388
    return code;
919,745✔
389
  } else {
390
    pMsg->info.handle = 0;
183,403,941✔
391
    code = rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
183,405,473✔
392
    if (code != 0) {
183,408,722✔
393
      dError("failed to send rpc msg");
×
394
      return code;
×
395
    }
396
    return 0;
183,408,722✔
397
  }
398
}
399
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
192,100,094✔
400
  int32_t code = 0;
192,100,094✔
401
  SDnode *pDnode = dmInstance();
192,100,094✔
402
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
192,097,794✔
403
    rpcFreeCont(pMsg->pCont);
×
404
    pMsg->pCont = NULL;
×
405
    if (pDnode->status == DND_STAT_INIT) {
×
406
      code = TSDB_CODE_APP_IS_STARTING;
×
407
    } else {
408
      code = TSDB_CODE_APP_IS_STOPPING;
×
409
    }
410
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
411
           pMsg->info.handle);
412
    return code;
×
413
  } else {
414
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
192,098,185✔
415
  }
416
}
417

418
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
390,577,969✔
419

420
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
388,597,084✔
421
  (void)rpcReleaseHandle(pHandle, type, status);
388,597,084✔
422
}
388,606,311✔
423

424
static bool rpcRfp(int32_t code, tmsg_t msgType) {
51,188,697✔
425
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
51,188,697✔
426
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
47,079,907✔
427
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
34,294,919✔
428
      code == TSDB_CODE_APP_IS_STOPPING) {
429
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
18,845,644✔
430
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
18,848,089✔
431
      return false;
×
432
    }
433
    return true;
18,848,089✔
434
  } else {
435
    return false;
32,343,053✔
436
  }
437
}
438
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
439
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_QUERY_SSMIGRATE_PROGRESS ||
×
440
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE ||
×
441
      msgType == TDMT_VND_QUERY_SCAN_PROGRESS || msgType == TDMT_VND_QUERY_TRIM_PROGRESS) {
×
442
    return true;
×
443
  }
444
  return false;
×
445
}
446
int32_t dmInitClient(SDnode *pDnode) {
707,006✔
447
  SDnodeTrans *pTrans = &pDnode->trans;
707,006✔
448

449
  SRpcInit rpcInit = {0};
707,006✔
450
  rpcInit.label = "DNODE-CLI";
707,006✔
451
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
707,006✔
452
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
707,006✔
453
  rpcInit.sessions = 1024;
707,006✔
454
  rpcInit.connType = TAOS_CONN_CLIENT;
707,006✔
455
  rpcInit.user = TSDB_DEFAULT_USER;
707,006✔
456
  rpcInit.idleTime = tsShellActivityTimer * 1000;
707,006✔
457
  rpcInit.parent = pDnode;
707,006✔
458
  rpcInit.rfp = rpcRfp;
707,006✔
459
  rpcInit.compressSize = tsCompressMsgSize;
707,006✔
460
  rpcInit.dfp = destroyAhandle;
707,006✔
461

462
  rpcInit.retryMinInterval = tsRedirectPeriod;
707,006✔
463
  rpcInit.retryStepFactor = tsRedirectFactor;
707,006✔
464
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
707,006✔
465
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
707,006✔
466

467
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
707,006✔
468
  rpcInit.failFastThreshold = 3;    // failed threshold
707,006✔
469
  rpcInit.ffp = dmFailFastFp;
707,006✔
470

471
  rpcInit.noDelayFp = rpcNoDelayMsg;
707,006✔
472

473
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
707,006✔
474
  connLimitNum = TMAX(connLimitNum, 10);
707,006✔
475
  connLimitNum = TMIN(connLimitNum, 500);
707,006✔
476

477
  rpcInit.connLimitNum = connLimitNum;
707,006✔
478
  rpcInit.connLimitLock = 1;
707,006✔
479
  rpcInit.supportBatch = 1;
707,006✔
480
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
707,006✔
481
  rpcInit.shareConn = 1;
707,006✔
482
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
707,006✔
483
  rpcInit.notWaitAvaliableConn = 0;
707,006✔
484
  rpcInit.startReadTimer = 1;
707,006✔
485
  rpcInit.readTimeout = tsReadTimeout;
707,006✔
486
  rpcInit.ipv6 = tsEnableIpv6;
707,006✔
487
  rpcInit.enableSSL = tsEnableTLS;
707,006✔
488
  rpcInit.enableSasl = tsEnableSasl;
707,006✔
489

490
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
707,006✔
491
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
707,006✔
492
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
707,006✔
493
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
707,006✔
494
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
707,006✔
495

496
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
707,006✔
497
    dError("failed to convert version string:%s to int", td_version);
×
498
  }
499

500
  pTrans->clientRpc = rpcOpen(&rpcInit);
707,006✔
501
  if (pTrans->clientRpc == NULL) {
707,006✔
502
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
503
    return terrno;
×
504
  }
505

506
  dDebug("dnode rpc client is initialized");
707,006✔
507
  return 0;
707,006✔
508
}
509
int32_t dmInitStatusClient(SDnode *pDnode) {
707,006✔
510
  SDnodeTrans *pTrans = &pDnode->trans;
707,006✔
511

512
  SRpcInit rpcInit = {0};
707,006✔
513
  rpcInit.label = "DNODE-STA-CLI";
707,006✔
514
  rpcInit.numOfThreads = 1;
707,006✔
515
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
707,006✔
516
  rpcInit.sessions = 1024;
707,006✔
517
  rpcInit.connType = TAOS_CONN_CLIENT;
707,006✔
518
  rpcInit.user = TSDB_DEFAULT_USER;
707,006✔
519
  rpcInit.idleTime = tsShellActivityTimer * 1000;
707,006✔
520
  rpcInit.parent = pDnode;
707,006✔
521
  rpcInit.rfp = rpcRfp;
707,006✔
522
  rpcInit.compressSize = tsCompressMsgSize;
707,006✔
523

524
  rpcInit.retryMinInterval = tsRedirectPeriod;
707,006✔
525
  rpcInit.retryStepFactor = tsRedirectFactor;
707,006✔
526
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
707,006✔
527
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
707,006✔
528

529
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
707,006✔
530
  rpcInit.failFastThreshold = 3;    // failed threshold
707,006✔
531
  rpcInit.ffp = dmFailFastFp;
707,006✔
532

533
  int32_t connLimitNum = 100;
707,006✔
534
  connLimitNum = TMAX(connLimitNum, 10);
707,006✔
535
  connLimitNum = TMIN(connLimitNum, 500);
707,006✔
536

537
  rpcInit.connLimitNum = connLimitNum;
707,006✔
538
  rpcInit.connLimitLock = 1;
707,006✔
539
  rpcInit.supportBatch = 1;
707,006✔
540
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
707,006✔
541
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
707,006✔
542
  rpcInit.startReadTimer = 0;
707,006✔
543
  rpcInit.readTimeout = 0;
707,006✔
544
  rpcInit.ipv6 = tsEnableIpv6;
707,006✔
545
  rpcInit.enableSasl = tsEnableSasl;
707,006✔
546

547
  rpcInit.enableSSL = tsEnableTLS;
707,006✔
548
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
707,006✔
549
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
707,006✔
550
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
707,006✔
551
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
707,006✔
552
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
707,006✔
553

554
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
707,006✔
555
    dError("failed to convert version string:%s to int", td_version);
×
556
  }
557

558
  pTrans->statusRpc = rpcOpen(&rpcInit);
707,006✔
559
  if (pTrans->statusRpc == NULL) {
707,006✔
560
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
561
    return terrno;
×
562
  }
563

564
  dDebug("dnode rpc status client is initialized");
707,006✔
565
  return 0;
707,006✔
566
}
567

568
int32_t dmInitSyncClient(SDnode *pDnode) {
707,006✔
569
  SDnodeTrans *pTrans = &pDnode->trans;
707,006✔
570

571
  SRpcInit rpcInit = {0};
707,006✔
572
  rpcInit.label = "DNODE-SYNC-CLI";
707,006✔
573
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
707,006✔
574
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
707,006✔
575
  rpcInit.sessions = 1024;
707,006✔
576
  rpcInit.connType = TAOS_CONN_CLIENT;
707,006✔
577
  rpcInit.user = TSDB_DEFAULT_USER;
707,006✔
578
  rpcInit.idleTime = tsShellActivityTimer * 1000;
707,006✔
579
  rpcInit.parent = pDnode;
707,006✔
580
  rpcInit.rfp = rpcRfp;
707,006✔
581
  rpcInit.compressSize = tsCompressMsgSize;
707,006✔
582

583
  rpcInit.retryMinInterval = tsRedirectPeriod;
707,006✔
584
  rpcInit.retryStepFactor = tsRedirectFactor;
707,006✔
585
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
707,006✔
586
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
707,006✔
587

588
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
707,006✔
589
  rpcInit.failFastThreshold = 3;    // failed threshold
707,006✔
590
  rpcInit.ffp = dmFailFastFp;
707,006✔
591

592
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
707,006✔
593
  connLimitNum = TMAX(connLimitNum, 10);
707,006✔
594
  connLimitNum = TMIN(connLimitNum, 500);
707,006✔
595

596
  rpcInit.connLimitNum = connLimitNum;
707,006✔
597
  rpcInit.connLimitLock = 1;
707,006✔
598
  rpcInit.supportBatch = 1;
707,006✔
599
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
707,006✔
600
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
707,006✔
601
  rpcInit.startReadTimer = 1;
707,006✔
602
  rpcInit.readTimeout = tsReadTimeout;
707,006✔
603
  rpcInit.ipv6 = tsEnableIpv6;
707,006✔
604
  rpcInit.enableSSL = tsEnableTLS;
707,006✔
605
  rpcInit.enableSasl = tsEnableSasl;
707,006✔
606

607
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
707,006✔
608
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
707,006✔
609
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
707,006✔
610
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
707,006✔
611
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
707,006✔
612
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
707,006✔
613
    dError("failed to convert version string:%s to int", td_version);
×
614
  }
615

616
  pTrans->syncRpc = rpcOpen(&rpcInit);
707,006✔
617
  if (pTrans->syncRpc == NULL) {
707,006✔
618
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
619
    return terrno;
×
620
  }
621

622
  dDebug("dnode rpc sync client is initialized");
707,006✔
623
  return 0;
707,006✔
624
}
625

626
void dmCleanupClient(SDnode *pDnode) {
707,006✔
627
  SDnodeTrans *pTrans = &pDnode->trans;
707,006✔
628
  if (pTrans->clientRpc) {
707,006✔
629
    rpcClose(pTrans->clientRpc);
707,006✔
630
    pTrans->clientRpc = NULL;
707,006✔
631
    dDebug("dnode rpc client is closed");
707,006✔
632
  }
633
}
707,006✔
634
void dmCleanupStatusClient(SDnode *pDnode) {
707,006✔
635
  SDnodeTrans *pTrans = &pDnode->trans;
707,006✔
636
  if (pTrans->statusRpc) {
707,006✔
637
    rpcClose(pTrans->statusRpc);
707,006✔
638
    pTrans->statusRpc = NULL;
707,006✔
639
    dDebug("dnode rpc status client is closed");
707,006✔
640
  }
641
}
707,006✔
642
void dmCleanupSyncClient(SDnode *pDnode) {
707,006✔
643
  SDnodeTrans *pTrans = &pDnode->trans;
707,006✔
644
  if (pTrans->syncRpc) {
707,006✔
645
    rpcClose(pTrans->syncRpc);
707,006✔
646
    pTrans->syncRpc = NULL;
707,006✔
647
    dDebug("dnode rpc sync client is closed");
707,006✔
648
  }
649
}
707,006✔
650

651
int32_t dmInitServer(SDnode *pDnode) {
707,062✔
652
  int32_t      code = 0;
707,062✔
653
  SDnodeTrans *pTrans = &pDnode->trans;
707,062✔
654

655
  SRpcInit rpcInit = {0};
707,062✔
656
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
707,062✔
657

658
  rpcInit.localPort = tsServerPort;
707,062✔
659
  rpcInit.label = "DND-S";
707,062✔
660
  rpcInit.numOfThreads = tsNumOfRpcThreads;
707,062✔
661
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
707,062✔
662
  rpcInit.sessions = tsMaxShellConns;
707,062✔
663
  rpcInit.connType = TAOS_CONN_SERVER;
707,062✔
664
  rpcInit.idleTime = tsShellActivityTimer * 1000;
707,062✔
665
  rpcInit.parent = pDnode;
707,062✔
666
  rpcInit.compressSize = tsCompressMsgSize;
707,062✔
667
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
707,062✔
668
  rpcInit.ipv6 = tsEnableIpv6;
707,062✔
669
  rpcInit.enableSSL = tsEnableTLS;
707,062✔
670
  rpcInit.enableSasl = tsEnableSasl;
707,062✔
671

672
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
707,062✔
673
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
707,062✔
674
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
707,062✔
675
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
707,062✔
676
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
707,062✔
677

678
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
707,062✔
679
    dError("failed to convert version string:%s to int", td_version);
×
680
  }
681

682
  pTrans->serverRpc = rpcOpen(&rpcInit);
707,062✔
683
  if (pTrans->serverRpc == NULL) {
707,062✔
684
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
56✔
685
    return terrno;
56✔
686
  }
687

688
  dDebug("dnode rpc server is initialized");
707,006✔
689
  return 0;
707,006✔
690
}
691

692
void dmCleanupServer(SDnode *pDnode) {
707,006✔
693
  SDnodeTrans *pTrans = &pDnode->trans;
707,006✔
694
  if (pTrans->serverRpc) {
707,006✔
695
    rpcClose(pTrans->serverRpc);
707,006✔
696
    pTrans->serverRpc = NULL;
707,006✔
697
    dDebug("dnode rpc server is closed");
707,006✔
698
  }
699
}
707,006✔
700

701
SMsgCb dmGetMsgcb(SDnode *pDnode) {
6,866,372✔
702
  SMsgCb msgCb = {
41,180,352✔
703
      .clientRpc = pDnode->trans.clientRpc,
6,866,372✔
704
      .serverRpc = pDnode->trans.serverRpc,
6,866,372✔
705
      .statusRpc = pDnode->trans.statusRpc,
6,866,372✔
706
      .syncRpc = pDnode->trans.syncRpc,
6,866,372✔
707
      .sendReqFp = dmSendReq,
708
      .sendSyncReqFp = dmSendSyncReq,
709
      .sendRspFp = dmSendRsp,
710
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
711
      .releaseHandleFp = dmReleaseHandle,
712
      .reportStartupFp = dmReportStartup,
713
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
714
      .getDnodeEpFp = dmGetDnodeEp,
715
      .data = &pDnode->data,
6,866,372✔
716
  };
717
  return msgCb;
6,866,372✔
718
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc