• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4876

10 Dec 2025 05:56AM UTC coverage: 64.632% (+0.2%) from 64.472%
#4876

push

travis-ci

guanshengliang
test: fix idmp case with checkDataMemLoop checked (#33862)

4 of 9 new or added lines in 3 files covered. (44.44%)

380 existing lines in 104 files now uncovered.

162866 of 251990 relevant lines covered (64.63%)

107950382.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.13
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
#define IS_STREAM_TRIGGER_RSP_MSG(_msg) (TDMT_STREAM_TRIGGER_CALC_RSP == (_msg) || TDMT_STREAM_TRIGGER_PULL_RSP == (_msg) || TDMT_STREAM_TRIGGER_DROP_RSP == (_msg))
23

24
static inline void dmSendRsp(SRpcMsg *pMsg) {
959,561✔
25
  if (rpcSendResponse(pMsg) != 0) {
959,561✔
26
    dError("failed to send response, msg:%p", pMsg);
×
27
  }
28
}
959,308✔
29

30
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
97,197✔
31
  SEpSet epSet = {0};
97,197✔
32
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
97,197✔
33

34
  if (epSet.numOfEps <= 1) {
96,909✔
35
    if (epSet.numOfEps == 0) {
44,047✔
36
      pMsg->pCont = NULL;
×
37
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
38
      return;
×
39
    }
40
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
41
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
42
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
44,047✔
43
      pMsg->pCont = NULL;
×
44
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
45
      return;
×
46
    }
47
  }
48

49
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
96,909✔
50
  pMsg->pCont = rpcMallocCont(contLen);
96,815✔
51
  if (pMsg->pCont == NULL) {
97,802✔
52
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
53
  } else {
54
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
97,802✔
55
    if (contLen < 0) {
97,802✔
56
      pMsg->code = contLen;
×
57
      return;
×
58
    }
59
    pMsg->contLen = contLen;
97,802✔
60
  }
61
}
62

63
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
1,811,826,331✔
64
  const STraceId *trace = &pMsg->info.traceId;
1,811,826,331✔
65

66
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
1,811,879,522✔
67
  if (msgFp == NULL) {
1,811,849,051✔
68
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
69
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
70
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
71
  }
72

73
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
1,811,849,051✔
74
  pMsg->info.wrapper = pWrapper;
1,811,849,051✔
75
  return (*msgFp)(pWrapper->pMgmt, pMsg);
1,811,859,319✔
76
}
77

78
static bool dmFailFastFp(tmsg_t msgType) {
×
79
  // add more msg type later
80
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
81
}
82

83
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
15,605,134✔
84
  if (code != TSDB_CODE_APP_IS_STOPPING) {
15,605,134✔
85
    return code;
4,614,996✔
86
  }
87
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
10,990,138✔
88
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
4,797,139✔
89
    code = TSDB_CODE_VND_STOPPED;
2,325,527✔
90
  }
91
  return code;
10,990,138✔
92
}
93
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
1,762✔
94
  int32_t        code = 0;
1,762✔
95
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
1,762✔
96
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
1,762✔
97
  if (code < 0) {
1,762✔
98
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
99
    return;
×
100
  }
101
  code = rpcSetIpWhite(pTrans, &ipWhite);
1,762✔
102
  pData->ipWhiteVer = ipWhite.ver;
1,762✔
103

104
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
1,762✔
105

106
  rpcFreeCont(pRpc->pCont);
1,762✔
107
}
108

109
static void dmUpdateRpcIpWhiteUnused(SDnodeData *pDnode, void *pTrans, SRpcMsg *pRpc) {
×
110
  int32_t code = TSDB_CODE_INVALID_MSG;
×
111
  dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
112
  rpcFreeCont(pRpc->pCont);
×
113
  pRpc->pCont = NULL;
×
114
  return;
×
115
}
116
static bool dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
1,937,038,013✔
117
  if (forbidden) {
1,937,038,013✔
118
    dError("User:%s host:%s not in ip white list", user, IP_ADDR_STR(clientIp));
×
119
    return true;
×
120
  } else {
121
    return false;
1,937,038,013✔
122
  }
123
}
124

125
static void dmUpdateRpcTimeWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
1,762✔
126
  int32_t        code = 0;
1,762✔
127
  SRetrieveDateTimeWhiteListRsp timeWhite = {0};
1,762✔
128
  code = tDeserializeSRetrieveDateTimeWhiteListRsp(pRpc->pCont, pRpc->contLen, &timeWhite);
1,762✔
129
  if (code < 0) {
1,762✔
130
    dError("failed to update rpc datetime-white since: %s", tstrerror(code));
×
131
    return;
×
132
  }
133
  // TODO: implement rpcSetTimeWhite
134
  //code = rpcSetIpWhite(pTrans, &ipWhite);
135
  pData->timeWhiteVer = timeWhite.ver;
1,762✔
136

137
  (void)tFreeSRetrieveDateTimeWhiteListRsp(&timeWhite);
1,762✔
138

139
  rpcFreeCont(pRpc->pCont);
1,762✔
140
}
141

142
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
143
  SRetrieveAnalyticAlgoRsp rsp = {0};
×
144
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
145
    taosAnalyUpdate(rsp.ver, rsp.hash);
×
146
    rsp.hash = NULL;
×
147
  }
148
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
×
149
  rpcFreeCont(pRpc->pCont);
×
150
}
×
151

152
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
1,937,044,120✔
153
  SDnodeTrans  *pTrans = &pDnode->trans;
1,937,044,120✔
154
  int32_t       code = -1;
1,937,075,306✔
155
  SRpcMsg      *pMsg = NULL;
1,937,075,306✔
156
  SMgmtWrapper *pWrapper = NULL;
1,937,048,831✔
157
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
1,937,048,831✔
158

159
  const STraceId *trace = &pRpc->info.traceId;
1,937,069,034✔
160
  dGDebug("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64 " %" PRIx64 ":%" PRIx64, TMSG_INFO(pRpc->msgType),
1,937,082,993✔
161
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId, TRACE_GET_ROOTID(trace), TRACE_GET_MSGID(trace));
162

163
  int32_t svrVer = 0;
1,937,085,939✔
164
  code = taosVersionStrToInt(td_version, &svrVer);
1,937,057,278✔
165
  if (code != 0) {
1,937,086,283✔
166
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
167
    goto _OVER;
×
168
  }
169
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
1,937,086,283✔
170
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
171
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
172
    goto _OVER;
×
173
  }
174

175
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, &pRpc->info.conn.cliAddr);
1,937,047,016✔
176
  if (isForbidden) {
1,937,037,054✔
177
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
178
    goto _OVER;
×
179
  }
180

181
  switch (pRpc->msgType) {
1,937,037,054✔
182
    case TDMT_DND_NET_TEST:
×
183
      dmProcessNetTestReq(pDnode, pRpc);
×
184
      return;
70,539✔
185
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
110,538,421✔
186
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
187
    case TDMT_SCH_FETCH_RSP:
188
    case TDMT_SCH_MERGE_FETCH_RSP:
189
    case TDMT_VND_SUBMIT_RSP:
190
    case TDMT_MND_GET_DB_INFO_RSP:
191
    case TDMT_STREAM_FETCH_RSP:
192
    case TDMT_STREAM_FETCH_FROM_RUNNER_RSP:
193
    case TDMT_STREAM_FETCH_FROM_CACHE_RSP:
194
    case TDMT_VND_SNODE_DROP_TABLE_RSP:
195
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
110,538,421✔
196
      return;
110,545,255✔
197
    case TDMT_MND_STATUS_RSP:
×
198
      if (pEpSet != NULL) {
×
199
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
200
      }
201
      break;
×
202
    case TDMT_MND_RETRIEVE_IP_WHITELIST_RSP:
×
203
      dmUpdateRpcIpWhiteUnused(&pDnode->data, pTrans->serverRpc, pRpc);
×
204
      return;
×
205
    case TDMT_MND_RETRIEVE_IP_WHITELIST_DUAL_RSP:
1,762✔
206
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
1,762✔
207
      return;
1,762✔
208
    case TDMT_MND_RETRIEVE_DATETIME_WHITELIST_RSP:
1,762✔
209
      dmUpdateRpcTimeWhite(&pDnode->data, pTrans->serverRpc, pRpc);
1,762✔
210
      return;
1,762✔
211
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
24,143✔
212
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
24,143✔
213
      return;
×
214
    default:
1,826,488,274✔
215
      break;
1,826,488,274✔
216
  }
217

218
  /*
219
  pDnode is null, TD-22618
220
  at trans.c line 91
221
  before this line, dmProcessRpcMsg callback is set
222
  after this line, parent is set
223
  so when dmProcessRpcMsg is called, pDonde is still null.
224
  */
225
  if (pDnode != NULL) {
1,826,488,274✔
226
    if (pDnode->status != DND_STAT_RUNNING) {
1,826,493,146✔
227
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
14,547,046✔
228
        dmProcessServerStartupStatus(pDnode, pRpc);
×
229
        return;
×
230
      } else {
231
        if (pDnode->status == DND_STAT_INIT) {
14,547,046✔
232
          code = TSDB_CODE_APP_IS_STARTING;
3,556,908✔
233
        } else {
234
          code = TSDB_CODE_APP_IS_STOPPING;
10,990,138✔
235
        }
236
        goto _OVER;
14,547,046✔
237
      }
238
    }
239
  } else {
240
    code = TSDB_CODE_APP_IS_STARTING;
×
241
    goto _OVER;
×
242
  }
243

244
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
1,811,943,443✔
245
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
246
    code = TSDB_CODE_INVALID_MSG_LEN;
×
247
    goto _OVER;
×
248
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
1,811,962,112✔
249
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
1,348,426✔
250
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
47,613✔
251
  }
252

253
  if (pHandle->defaultNtype == NODE_END) {
1,811,989,635✔
254
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
255
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
256
    goto _OVER;
×
257
  }
258

259
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
1,811,924,190✔
260
  if (pHandle->needCheckVgId) {
1,811,971,307✔
261
    if (pRpc->contLen > 0) {
777,594,184✔
262
      const SMsgHead *pHead = pRpc->pCont;
777,597,915✔
263
      const int32_t   vgId = ntohl(pHead->vgId);
777,599,272✔
264
      switch (vgId) {
777,562,707✔
265
        case QNODE_HANDLE:
×
266
          pWrapper = &pDnode->wrappers[QNODE];
×
267
          break;
×
268
        case SNODE_HANDLE:
2,153,625✔
269
          pWrapper = &pDnode->wrappers[SNODE];
2,153,625✔
270
          break;
2,154,022✔
271
        case MNODE_HANDLE:
45,618,260✔
272
          pWrapper = &pDnode->wrappers[MNODE];
45,618,260✔
273
          break;
45,632,905✔
274
        default:
729,790,822✔
275
          break;
729,790,822✔
276
      }
277
    } else {
278
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
423✔
279
      code = TSDB_CODE_INVALID_MSG_LEN;
423✔
280
      goto _OVER;
423✔
281
    }
282
  }
283

284
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
1,811,955,690✔
285
    pWrapper = NULL;
98,104✔
286
    goto _OVER;
98,104✔
287
  }
288

289
  pRpc->info.wrapper = pWrapper;
1,811,878,083✔
290

291
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
1,811,882,781✔
292
  if (IsReq(pRpc)) {
1,811,882,781✔
293
    if (pRpc->msgType == TDMT_SYNC_HEARTBEAT || pRpc->msgType == TDMT_SYNC_HEARTBEAT_REPLY)
1,707,779,349✔
294
      itype = DEF_QITEM;
85,285,537✔
295
    else
296
      itype = RPC_QITEM;
1,622,496,934✔
297
  } else {
298
    itype = DEF_QITEM;
104,115,045✔
299
  }
300
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
1,811,897,516✔
301
  if (code) goto _OVER;
1,811,854,019✔
302

303
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
1,811,854,019✔
304
  dGDebug("msg:%p, is created, type:%s handle:%p len:%d %" PRIx64 ":%" PRIx64, pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
1,811,854,019✔
305
          pRpc->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
306

307
  code = dmProcessNodeMsg(pWrapper, pMsg);
1,811,857,428✔
308

309
_OVER:
1,826,492,702✔
310
  if (code != 0) {
1,826,539,413✔
311
    code = dmConvertErrCode(pRpc->msgType, code);
15,605,134✔
312
    if (pMsg) {
15,605,134✔
313
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
959,561✔
314
    } else {
315
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
14,645,573✔
316
    }
317

318
    if (IsReq(pRpc)) {
15,605,134✔
319
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
15,583,930✔
320
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
15,582,915✔
321
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
97,528✔
322
      }
323

324
      if (pWrapper != NULL) {
15,584,032✔
325
        dmSendRsp(&rsp);
959,561✔
326
      } else {
327
        if (rpcSendResponse(&rsp) != 0) {
14,624,471✔
328
          dError("failed to send response, msg:%p", &rsp);
×
329
        }
330
      }
331
    } else if (NULL == pMsg && IS_STREAM_TRIGGER_RSP_MSG(pRpc->msgType)) {
21,204✔
332
      destroyAhandle(pRpc->info.ahandle);
7,838✔
333
      dDebug("msg:%s ahandle freed", TMSG_INFO(pRpc->msgType));
7,838✔
334
    }
335

336
    if (pMsg != NULL) {
15,605,134✔
337
      dGTrace("msg:%p, is freed", pMsg);
959,561✔
338
      taosFreeQitem(pMsg);
959,561✔
339
    }
340
    rpcFreeCont(pRpc->pCont);
15,605,134✔
341
    pRpc->pCont = NULL;
15,605,134✔
342
  }
343

344
  dmReleaseWrapper(pWrapper);
1,826,539,311✔
345
}
346

347
int32_t dmInitMsgHandle(SDnode *pDnode) {
678,091✔
348
  SDnodeTrans *pTrans = &pDnode->trans;
678,091✔
349

350
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
4,746,637✔
351
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
4,068,546✔
352
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
4,068,546✔
353
    if (pArray == NULL) return -1;
4,068,546✔
354

355
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
242,078,487✔
356
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
238,009,941✔
357
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
238,009,941✔
358
      if (pMgmt->needCheckVgId) {
238,009,941✔
359
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
26,445,549✔
360
      }
361
      if (!pMgmt->needCheckVgId) {
238,009,941✔
362
        pHandle->defaultNtype = ntype;
211,564,392✔
363
      }
364
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
238,009,941✔
365
    }
366

367
    taosArrayDestroy(pArray);
4,068,546✔
368
  }
369

370
  return 0;
678,091✔
371
}
372

373
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
218,532,402✔
374
  int32_t code = 0;
218,532,402✔
375
  SDnode *pDnode = dmInstance();
218,532,402✔
376
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
218,531,161✔
377
    rpcFreeCont(pMsg->pCont);
1,159,449✔
378
    pMsg->pCont = NULL;
1,159,449✔
379
    if (pDnode->status == DND_STAT_INIT) {
1,159,449✔
380
      code = TSDB_CODE_APP_IS_STARTING;
4,156✔
381
    } else {
382
      code = TSDB_CODE_APP_IS_STOPPING;
1,155,293✔
383
    }
384
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
1,159,449✔
385
           pMsg->info.handle);
386
    return code;
1,159,449✔
387
  } else {
388
    pMsg->info.handle = 0;
217,373,981✔
389
    code = rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
217,371,551✔
390
    if (code != 0) {
217,375,541✔
391
      dError("failed to send rpc msg");
×
392
      return code;
×
393
    }
394
    return 0;
217,375,541✔
395
  }
396
}
397
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
189,347,925✔
398
  int32_t code = 0;
189,347,925✔
399
  SDnode *pDnode = dmInstance();
189,347,925✔
400
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
189,346,467✔
401
    rpcFreeCont(pMsg->pCont);
×
402
    pMsg->pCont = NULL;
×
403
    if (pDnode->status == DND_STAT_INIT) {
×
404
      code = TSDB_CODE_APP_IS_STARTING;
×
405
    } else {
406
      code = TSDB_CODE_APP_IS_STOPPING;
×
407
    }
408
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
409
           pMsg->info.handle);
410
    return code;
×
411
  } else {
412
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
189,346,539✔
413
  }
414
}
415

416
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
253,414,674✔
417

418
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
251,404,402✔
419
  (void)rpcReleaseHandle(pHandle, type, status);
251,404,402✔
420
}
251,406,366✔
421

422
static bool rpcRfp(int32_t code, tmsg_t msgType) {
51,030,360✔
423
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
51,030,360✔
424
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
46,607,459✔
425
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
36,883,723✔
426
      code == TSDB_CODE_APP_IS_STOPPING) {
427
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
15,976,220✔
428
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
15,982,926✔
UNCOV
429
      return false;
×
430
    }
431
    return true;
15,982,926✔
432
  } else {
433
    return false;
35,054,140✔
434
  }
435
}
436
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
437
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_QUERY_SSMIGRATE_PROGRESS ||
×
438
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE ||
×
439
      msgType == TDMT_VND_QUERY_SCAN_PROGRESS || msgType == TDMT_VND_QUERY_TRIM_PROGRESS) {
×
440
    return true;
×
441
  }
442
  return false;
×
443
}
444
int32_t dmInitClient(SDnode *pDnode) {
677,510✔
445
  SDnodeTrans *pTrans = &pDnode->trans;
677,510✔
446

447
  SRpcInit rpcInit = {0};
677,510✔
448
  rpcInit.label = "DNODE-CLI";
677,510✔
449
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
677,510✔
450
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
677,510✔
451
  rpcInit.sessions = 1024;
677,510✔
452
  rpcInit.connType = TAOS_CONN_CLIENT;
677,510✔
453
  rpcInit.user = TSDB_DEFAULT_USER;
677,510✔
454
  rpcInit.idleTime = tsShellActivityTimer * 1000;
677,510✔
455
  rpcInit.parent = pDnode;
677,510✔
456
  rpcInit.rfp = rpcRfp;
677,510✔
457
  rpcInit.compressSize = tsCompressMsgSize;
677,510✔
458
  rpcInit.dfp = destroyAhandle;
677,510✔
459

460
  rpcInit.retryMinInterval = tsRedirectPeriod;
677,510✔
461
  rpcInit.retryStepFactor = tsRedirectFactor;
677,510✔
462
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
677,510✔
463
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
677,510✔
464

465
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
677,510✔
466
  rpcInit.failFastThreshold = 3;    // failed threshold
677,510✔
467
  rpcInit.ffp = dmFailFastFp;
677,510✔
468

469
  rpcInit.noDelayFp = rpcNoDelayMsg;
677,510✔
470

471
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
677,510✔
472
  connLimitNum = TMAX(connLimitNum, 10);
677,510✔
473
  connLimitNum = TMIN(connLimitNum, 500);
677,510✔
474

475
  rpcInit.connLimitNum = connLimitNum;
677,510✔
476
  rpcInit.connLimitLock = 1;
677,510✔
477
  rpcInit.supportBatch = 1;
677,510✔
478
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
677,510✔
479
  rpcInit.shareConn = 1;
677,510✔
480
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
677,510✔
481
  rpcInit.notWaitAvaliableConn = 0;
677,510✔
482
  rpcInit.startReadTimer = 1;
677,510✔
483
  rpcInit.readTimeout = tsReadTimeout;
677,510✔
484
  rpcInit.ipv6 = tsEnableIpv6;
677,510✔
485
  rpcInit.enableSSL = tsEnableTLS;
677,510✔
486

487
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
677,510✔
488
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
677,510✔
489
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
677,510✔
490
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
677,510✔
491
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
677,510✔
492

493
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
677,510✔
494
    dError("failed to convert version string:%s to int", td_version);
×
495
  }
496

497
  pTrans->clientRpc = rpcOpen(&rpcInit);
677,510✔
498
  if (pTrans->clientRpc == NULL) {
677,510✔
499
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
500
    return terrno;
×
501
  }
502

503
  dDebug("dnode rpc client is initialized");
677,510✔
504
  return 0;
677,510✔
505
}
506
int32_t dmInitStatusClient(SDnode *pDnode) {
677,510✔
507
  SDnodeTrans *pTrans = &pDnode->trans;
677,510✔
508

509
  SRpcInit rpcInit = {0};
677,510✔
510
  rpcInit.label = "DNODE-STA-CLI";
677,510✔
511
  rpcInit.numOfThreads = 1;
677,510✔
512
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
677,510✔
513
  rpcInit.sessions = 1024;
677,510✔
514
  rpcInit.connType = TAOS_CONN_CLIENT;
677,510✔
515
  rpcInit.user = TSDB_DEFAULT_USER;
677,510✔
516
  rpcInit.idleTime = tsShellActivityTimer * 1000;
677,510✔
517
  rpcInit.parent = pDnode;
677,510✔
518
  rpcInit.rfp = rpcRfp;
677,510✔
519
  rpcInit.compressSize = tsCompressMsgSize;
677,510✔
520

521
  rpcInit.retryMinInterval = tsRedirectPeriod;
677,510✔
522
  rpcInit.retryStepFactor = tsRedirectFactor;
677,510✔
523
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
677,510✔
524
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
677,510✔
525

526
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
677,510✔
527
  rpcInit.failFastThreshold = 3;    // failed threshold
677,510✔
528
  rpcInit.ffp = dmFailFastFp;
677,510✔
529

530
  int32_t connLimitNum = 100;
677,510✔
531
  connLimitNum = TMAX(connLimitNum, 10);
677,510✔
532
  connLimitNum = TMIN(connLimitNum, 500);
677,510✔
533

534
  rpcInit.connLimitNum = connLimitNum;
677,510✔
535
  rpcInit.connLimitLock = 1;
677,510✔
536
  rpcInit.supportBatch = 1;
677,510✔
537
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
677,510✔
538
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
677,510✔
539
  rpcInit.startReadTimer = 0;
677,510✔
540
  rpcInit.readTimeout = 0;
677,510✔
541
  rpcInit.ipv6 = tsEnableIpv6;
677,510✔
542

543
  rpcInit.enableSSL = tsEnableTLS;
677,510✔
544
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
677,510✔
545
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
677,510✔
546
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
677,510✔
547
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
677,510✔
548
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
677,510✔
549

550
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
677,510✔
551
    dError("failed to convert version string:%s to int", td_version);
×
552
  }
553

554
  pTrans->statusRpc = rpcOpen(&rpcInit);
677,510✔
555
  if (pTrans->statusRpc == NULL) {
677,510✔
556
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
557
    return terrno;
×
558
  }
559

560
  dDebug("dnode rpc status client is initialized");
677,510✔
561
  return 0;
677,510✔
562
}
563

564
int32_t dmInitSyncClient(SDnode *pDnode) {
677,510✔
565
  SDnodeTrans *pTrans = &pDnode->trans;
677,510✔
566

567
  SRpcInit rpcInit = {0};
677,510✔
568
  rpcInit.label = "DNODE-SYNC-CLI";
677,510✔
569
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
677,510✔
570
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
677,510✔
571
  rpcInit.sessions = 1024;
677,510✔
572
  rpcInit.connType = TAOS_CONN_CLIENT;
677,510✔
573
  rpcInit.user = TSDB_DEFAULT_USER;
677,510✔
574
  rpcInit.idleTime = tsShellActivityTimer * 1000;
677,510✔
575
  rpcInit.parent = pDnode;
677,510✔
576
  rpcInit.rfp = rpcRfp;
677,510✔
577
  rpcInit.compressSize = tsCompressMsgSize;
677,510✔
578

579
  rpcInit.retryMinInterval = tsRedirectPeriod;
677,510✔
580
  rpcInit.retryStepFactor = tsRedirectFactor;
677,510✔
581
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
677,510✔
582
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
677,510✔
583

584
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
677,510✔
585
  rpcInit.failFastThreshold = 3;    // failed threshold
677,510✔
586
  rpcInit.ffp = dmFailFastFp;
677,510✔
587

588
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
677,510✔
589
  connLimitNum = TMAX(connLimitNum, 10);
677,510✔
590
  connLimitNum = TMIN(connLimitNum, 500);
677,510✔
591

592
  rpcInit.connLimitNum = connLimitNum;
677,510✔
593
  rpcInit.connLimitLock = 1;
677,510✔
594
  rpcInit.supportBatch = 1;
677,510✔
595
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
677,510✔
596
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
677,510✔
597
  rpcInit.startReadTimer = 1;
677,510✔
598
  rpcInit.readTimeout = tsReadTimeout;
677,510✔
599
  rpcInit.ipv6 = tsEnableIpv6;
677,510✔
600
  rpcInit.enableSSL = tsEnableTLS;
677,510✔
601

602
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
677,510✔
603
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
677,510✔
604
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
677,510✔
605
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
677,510✔
606
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
677,510✔
607
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
677,510✔
608
    dError("failed to convert version string:%s to int", td_version);
×
609
  }
610

611
  pTrans->syncRpc = rpcOpen(&rpcInit);
677,510✔
612
  if (pTrans->syncRpc == NULL) {
677,510✔
613
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
614
    return terrno;
×
615
  }
616

617
  dDebug("dnode rpc sync client is initialized");
677,510✔
618
  return 0;
677,510✔
619
}
620

621
void dmCleanupClient(SDnode *pDnode) {
677,510✔
622
  SDnodeTrans *pTrans = &pDnode->trans;
677,510✔
623
  if (pTrans->clientRpc) {
677,510✔
624
    rpcClose(pTrans->clientRpc);
677,510✔
625
    pTrans->clientRpc = NULL;
677,510✔
626
    dDebug("dnode rpc client is closed");
677,510✔
627
  }
628
}
677,510✔
629
void dmCleanupStatusClient(SDnode *pDnode) {
677,510✔
630
  SDnodeTrans *pTrans = &pDnode->trans;
677,510✔
631
  if (pTrans->statusRpc) {
677,510✔
632
    rpcClose(pTrans->statusRpc);
677,510✔
633
    pTrans->statusRpc = NULL;
677,510✔
634
    dDebug("dnode rpc status client is closed");
677,510✔
635
  }
636
}
677,510✔
637
void dmCleanupSyncClient(SDnode *pDnode) {
677,510✔
638
  SDnodeTrans *pTrans = &pDnode->trans;
677,510✔
639
  if (pTrans->syncRpc) {
677,510✔
640
    rpcClose(pTrans->syncRpc);
677,510✔
641
    pTrans->syncRpc = NULL;
677,510✔
642
    dDebug("dnode rpc sync client is closed");
677,510✔
643
  }
644
}
677,510✔
645

646
int32_t dmInitServer(SDnode *pDnode) {
678,091✔
647
  int32_t      code = 0;
678,091✔
648
  SDnodeTrans *pTrans = &pDnode->trans;
678,091✔
649

650
  SRpcInit rpcInit = {0};
678,091✔
651
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
678,091✔
652

653
  rpcInit.localPort = tsServerPort;
678,091✔
654
  rpcInit.label = "DND-S";
678,091✔
655
  rpcInit.numOfThreads = tsNumOfRpcThreads;
678,091✔
656
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
678,091✔
657
  rpcInit.sessions = tsMaxShellConns;
678,091✔
658
  rpcInit.connType = TAOS_CONN_SERVER;
678,091✔
659
  rpcInit.idleTime = tsShellActivityTimer * 1000;
678,091✔
660
  rpcInit.parent = pDnode;
678,091✔
661
  rpcInit.compressSize = tsCompressMsgSize;
678,091✔
662
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
678,091✔
663
  rpcInit.ipv6 = tsEnableIpv6;
678,091✔
664
  rpcInit.enableSSL = tsEnableTLS;
678,091✔
665

666
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
678,091✔
667
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
678,091✔
668
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
678,091✔
669
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
678,091✔
670
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
678,091✔
671

672
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
678,091✔
673
    dError("failed to convert version string:%s to int", td_version);
×
674
  }
675

676
  pTrans->serverRpc = rpcOpen(&rpcInit);
678,091✔
677
  if (pTrans->serverRpc == NULL) {
678,091✔
678
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
581✔
679
    return terrno;
581✔
680
  }
681

682
  dDebug("dnode rpc server is initialized");
677,510✔
683
  return 0;
677,510✔
684
}
685

686
void dmCleanupServer(SDnode *pDnode) {
677,510✔
687
  SDnodeTrans *pTrans = &pDnode->trans;
677,510✔
688
  if (pTrans->serverRpc) {
677,510✔
689
    rpcClose(pTrans->serverRpc);
677,510✔
690
    pTrans->serverRpc = NULL;
677,510✔
691
    dDebug("dnode rpc server is closed");
677,510✔
692
  }
693
}
677,510✔
694

695
SMsgCb dmGetMsgcb(SDnode *pDnode) {
6,554,726✔
696
  SMsgCb msgCb = {
39,136,681✔
697
      .clientRpc = pDnode->trans.clientRpc,
6,554,726✔
698
      .serverRpc = pDnode->trans.serverRpc,
6,554,726✔
699
      .statusRpc = pDnode->trans.statusRpc,
6,554,726✔
700
      .syncRpc = pDnode->trans.syncRpc,
6,554,726✔
701
      .sendReqFp = dmSendReq,
702
      .sendSyncReqFp = dmSendSyncReq,
703
      .sendRspFp = dmSendRsp,
704
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
705
      .releaseHandleFp = dmReleaseHandle,
706
      .reportStartupFp = dmReportStartup,
707
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
708
      .getDnodeEpFp = dmGetDnodeEp,
709
      .data = &pDnode->data,
6,554,726✔
710
  };
711
  return msgCb;
6,554,726✔
712
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc