• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4754

25 Sep 2025 05:58AM UTC coverage: 57.946% (-1.0%) from 58.977%
#4754

push

travis-ci

web-flow
enh: taos command line support '-uroot' on windows (#33055)

133189 of 293169 branches covered (45.43%)

Branch coverage included in aggregate %.

201677 of 284720 relevant lines covered (70.83%)

5398749.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.46
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

22
#define IS_STREAM_TRIGGER_RSP_MSG(_msg) (TDMT_STREAM_TRIGGER_CALC_RSP == (_msg) || TDMT_STREAM_TRIGGER_PULL_RSP == (_msg))
23

24
static inline void dmSendRsp(SRpcMsg *pMsg) {
6,338✔
25
  if (rpcSendResponse(pMsg) != 0) {
6,338!
26
    dError("failed to send response, msg:%p", pMsg);
×
27
  }
28
}
6,338✔
29

30
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
291✔
31
  SEpSet epSet = {0};
291✔
32
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
291✔
33

34
  if (epSet.numOfEps <= 1) {
294✔
35
    if (epSet.numOfEps == 0) {
145!
36
      pMsg->pCont = NULL;
×
37
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
38
      return;
×
39
    }
40
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
41
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
42
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
145!
43
      pMsg->pCont = NULL;
×
44
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
45
      return;
×
46
    }
47
  }
48

49
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
294✔
50
  pMsg->pCont = rpcMallocCont(contLen);
294✔
51
  if (pMsg->pCont == NULL) {
298!
52
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
53
  } else {
54
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
298✔
55
    if (contLen < 0) {
297!
56
      pMsg->code = contLen;
×
57
      return;
×
58
    }
59
    pMsg->contLen = contLen;
297✔
60
  }
61
}
62

63
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
9,273,290✔
64
  const STraceId *trace = &pMsg->info.traceId;
9,273,290✔
65

66
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
9,273,290✔
67
  if (msgFp == NULL) {
9,273,290!
68
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
69
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
70
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
71
  }
72

73
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
9,273,290!
74
  pMsg->info.wrapper = pWrapper;
9,273,290✔
75
  return (*msgFp)(pWrapper->pMgmt, pMsg);
9,273,290✔
76
}
77

78
static bool dmFailFastFp(tmsg_t msgType) {
×
79
  // add more msg type later
80
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
81
}
82

83
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
31,390✔
84
  if (code != TSDB_CODE_APP_IS_STOPPING) {
31,390✔
85
    return code;
8,066✔
86
  }
87
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
23,324✔
88
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
11,321✔
89
    code = TSDB_CODE_VND_STOPPED;
8,034✔
90
  }
91
  return code;
23,324✔
92
}
93
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
7✔
94
  int32_t        code = 0;
7✔
95
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
7✔
96
  code = tDeserializeSUpdateIpWhiteDual(pRpc->pCont, pRpc->contLen, &ipWhite);
7✔
97
  if (code < 0) {
7!
98
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
99
    return;
×
100
  }
101
  code = rpcSetIpWhite(pTrans, &ipWhite);
7✔
102
  pData->ipWhiteVer = ipWhite.ver;
7✔
103

104
  (void)tFreeSUpdateIpWhiteDualReq(&ipWhite);
7✔
105

106
  rpcFreeCont(pRpc->pCont);
7✔
107
}
108

109
static void dmUpdateRpcIpWhiteUnused(SDnodeData *pDnode, void *pTrans, SRpcMsg *pRpc) {
×
110
  int32_t code = TSDB_CODE_INVALID_MSG;
×
111
  dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
112
  rpcFreeCont(pRpc->pCont);
×
113
  pRpc->pCont = NULL;
×
114
  return;
×
115
}
116
static bool dmIsForbiddenIp(int8_t forbidden, char *user, SIpAddr *clientIp) {
10,208,000✔
117
  if (forbidden) {
10,208,000!
118
    dError("User:%s host:%s not in ip white list", user, IP_ADDR_STR(clientIp));
×
119
    return true;
×
120
  } else {
121
    return false;
10,208,000✔
122
  }
123
}
124

125
static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
126
  SRetrieveAnalyticAlgoRsp rsp = {0};
×
127
  if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
128
    taosAnalyUpdate(rsp.ver, rsp.hash);
×
129
    rsp.hash = NULL;
×
130
  }
131
  tFreeRetrieveAnalyticAlgoRsp(&rsp);
×
132
  rpcFreeCont(pRpc->pCont);
×
133
}
×
134

135
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
10,208,127✔
136
  SDnodeTrans  *pTrans = &pDnode->trans;
10,208,127✔
137
  int32_t       code = -1;
10,208,127✔
138
  SRpcMsg      *pMsg = NULL;
10,208,127✔
139
  SMgmtWrapper *pWrapper = NULL;
10,208,127✔
140
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
10,208,127✔
141

142
  const STraceId *trace = &pRpc->info.traceId;
10,208,127✔
143
  dGDebug("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64 " %" PRIx64 ":%" PRIx64, TMSG_INFO(pRpc->msgType),
10,208,127!
144
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId, TRACE_GET_ROOTID(trace), TRACE_GET_MSGID(trace));
145

146
  int32_t svrVer = 0;
10,208,127✔
147
  code = taosVersionStrToInt(td_version, &svrVer);
10,208,127✔
148
  if (code != 0) {
10,208,751!
149
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
150
    goto _OVER;
×
151
  }
152
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
10,208,751!
153
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:%s", pRpc->info.cliVer, svrVer,
×
154
           IP_ADDR_STR(&pRpc->info.conn.cliAddr));
155
    goto _OVER;
×
156
  }
157

158
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, &pRpc->info.conn.cliAddr);
10,208,340✔
159
  if (isForbidden) {
10,208,251!
160
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
161
    goto _OVER;
×
162
  }
163

164
  switch (pRpc->msgType) {
10,208,251!
165
    case TDMT_DND_NET_TEST:
×
166
      dmProcessNetTestReq(pDnode, pRpc);
×
167
      return;
910,081✔
168
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
909,949✔
169
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
170
    case TDMT_SCH_FETCH_RSP:
171
    case TDMT_SCH_MERGE_FETCH_RSP:
172
    case TDMT_VND_SUBMIT_RSP:
173
    case TDMT_MND_GET_DB_INFO_RSP:
174
    case TDMT_STREAM_FETCH_RSP:
175
    case TDMT_STREAM_FETCH_FROM_RUNNER_RSP:
176
    case TDMT_STREAM_FETCH_FROM_CACHE_RSP:
177
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
909,949✔
178
      return;
910,074✔
179
    case TDMT_MND_STATUS_RSP:
×
180
      if (pEpSet != NULL) {
×
181
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
182
      }
183
      break;
×
184
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
185
      dmUpdateRpcIpWhiteUnused(&pDnode->data, pTrans->serverRpc, pRpc);
×
186
      return;
×
187
    case TDMT_MND_RETRIEVE_IP_WHITE_DUAL_RSP:
7✔
188
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
7✔
189
      return;
7✔
190
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
×
191
      dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
×
192
      return;
×
193
    default:
9,298,295✔
194
      break;
9,298,295✔
195
  }
196

197
  /*
198
  pDnode is null, TD-22618
199
  at trans.c line 91
200
  before this line, dmProcessRpcMsg callback is set
201
  after this line, parent is set
202
  so when dmProcessRpcMsg is called, pDonde is still null.
203
  */
204
  if (pDnode != NULL) {
9,298,295✔
205
    if (pDnode->status != DND_STAT_RUNNING) {
9,298,011✔
206
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
24,758!
207
        dmProcessServerStartupStatus(pDnode, pRpc);
×
208
        return;
×
209
      } else {
210
        if (pDnode->status == DND_STAT_INIT) {
24,758✔
211
          code = TSDB_CODE_APP_IS_STARTING;
1,430✔
212
        } else {
213
          code = TSDB_CODE_APP_IS_STOPPING;
23,328✔
214
        }
215
        goto _OVER;
24,758✔
216
      }
217
    }
218
  } else {
219
    code = TSDB_CODE_APP_IS_STARTING;
284✔
220
    goto _OVER;
284✔
221
  }
222

223
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
9,273,253!
224
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
225
    code = TSDB_CODE_INVALID_MSG_LEN;
×
226
    goto _OVER;
×
227
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
9,273,253✔
228
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
1,466!
229
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
158!
230
  }
231

232
  if (pHandle->defaultNtype == NODE_END) {
9,273,253!
233
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
234
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
235
    goto _OVER;
×
236
  }
237

238
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
9,273,253✔
239
  if (pHandle->needCheckVgId) {
9,273,253✔
240
    if (pRpc->contLen > 0) {
6,408,335!
241
      const SMsgHead *pHead = pRpc->pCont;
6,408,734✔
242
      const int32_t   vgId = ntohl(pHead->vgId);
6,408,734✔
243
      switch (vgId) {
6,408,734✔
244
        case QNODE_HANDLE:
971,085✔
245
          pWrapper = &pDnode->wrappers[QNODE];
971,085✔
246
          break;
971,085✔
247
        case SNODE_HANDLE:
1,118✔
248
          pWrapper = &pDnode->wrappers[SNODE];
1,118✔
249
          break;
1,118✔
250
        case MNODE_HANDLE:
137,379✔
251
          pWrapper = &pDnode->wrappers[MNODE];
137,379✔
252
          break;
137,379✔
253
        default:
5,299,152✔
254
          break;
5,299,152✔
255
      }
256
    } else {
257
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
258
      code = TSDB_CODE_INVALID_MSG_LEN;
×
259
      goto _OVER;
×
260
    }
261
  }
262

263
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
9,273,652✔
264
    pWrapper = NULL;
295✔
265
    goto _OVER;
295✔
266
  }
267

268
  pRpc->info.wrapper = pWrapper;
9,273,609✔
269

270
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
9,273,609✔
271
  if (IsReq(pRpc)) {
9,273,609✔
272
    if (pRpc->msgType == TDMT_SYNC_HEARTBEAT || pRpc->msgType == TDMT_SYNC_HEARTBEAT_REPLY)
9,111,289✔
273
      itype = DEF_QITEM;
141,076✔
274
    else
275
      itype = RPC_QITEM;
8,970,213✔
276
  } else {
277
    itype = DEF_QITEM;
162,320✔
278
  }
279
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
9,273,609✔
280
  if (code) goto _OVER;
9,273,986!
281

282
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
9,273,986✔
283
  dGDebug("msg:%p, is created, type:%s handle:%p len:%d %" PRIx64 ":%" PRIx64, pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
9,273,986!
284
          pRpc->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
285

286
  code = dmProcessNodeMsg(pWrapper, pMsg);
9,273,988✔
287

288
_OVER:
9,298,769✔
289
  if (code != 0) {
9,298,769✔
290
    code = dmConvertErrCode(pRpc->msgType, code);
31,389✔
291
    if (pMsg) {
31,392✔
292
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
6,340!
293
    } else {
294
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
25,052!
295
    }
296

297
    if (IsReq(pRpc)) {
31,392✔
298
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
31,358✔
299
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
31,358✔
300
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
295✔
301
      }
302

303
      if (pWrapper != NULL) {
31,361✔
304
        dmSendRsp(&rsp);
6,338✔
305
      } else {
306
        if (rpcSendResponse(&rsp) != 0) {
25,023!
307
          dError("failed to send response, msg:%p", &rsp);
×
308
        }
309
      }
310
    } else if (NULL == pMsg && IS_STREAM_TRIGGER_RSP_MSG(pRpc->msgType)) {
34!
311
      destroyAhandle(pRpc->info.ahandle);
×
312
      dDebug("msg:%s ahandle freed", TMSG_INFO(pRpc->msgType));
×
313
    }
314

315
    if (pMsg != NULL) {
31,394✔
316
      dGTrace("msg:%p, is freed", pMsg);
6,338!
317
      taosFreeQitem(pMsg);
6,338✔
318
    }
319
    rpcFreeCont(pRpc->pCont);
31,396✔
320
    pRpc->pCont = NULL;
31,395✔
321
  }
322

323
  dmReleaseWrapper(pWrapper);
9,298,775✔
324
}
325

326
int32_t dmInitMsgHandle(SDnode *pDnode) {
2,381✔
327
  SDnodeTrans *pTrans = &pDnode->trans;
2,381✔
328

329
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
16,667✔
330
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
14,286✔
331
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
14,286✔
332
    if (pArray == NULL) return -1;
14,286!
333

334
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
742,872✔
335
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
728,586✔
336
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
728,586✔
337
      if (pMgmt->needCheckVgId) {
728,586✔
338
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
85,716✔
339
      }
340
      if (!pMgmt->needCheckVgId) {
728,586✔
341
        pHandle->defaultNtype = ntype;
642,870✔
342
      }
343
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
728,586✔
344
    }
345

346
    taosArrayDestroy(pArray);
14,286✔
347
  }
348

349
  return 0;
2,381✔
350
}
351

352
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
368,308✔
353
  int32_t code = 0;
368,308✔
354
  SDnode *pDnode = dmInstance();
368,308✔
355
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
368,306!
356
    rpcFreeCont(pMsg->pCont);
1,871✔
357
    pMsg->pCont = NULL;
1,871✔
358
    if (pDnode->status == DND_STAT_INIT) {
1,871!
359
      code = TSDB_CODE_APP_IS_STARTING;
×
360
    } else {
361
      code = TSDB_CODE_APP_IS_STOPPING;
1,871✔
362
    }
363
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
1,871!
364
           pMsg->info.handle);
365
    return code;
1,871✔
366
  } else {
367
    pMsg->info.handle = 0;
366,435✔
368
    code = rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
366,435✔
369
    if (code != 0) {
366,478!
370
      dError("failed to send rpc msg");
×
371
      return code;
×
372
    }
373
    return 0;
366,481✔
374
  }
375
}
376
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
645,312✔
377
  int32_t code = 0;
645,312✔
378
  SDnode *pDnode = dmInstance();
645,312✔
379
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
645,308!
380
    rpcFreeCont(pMsg->pCont);
×
381
    pMsg->pCont = NULL;
×
382
    if (pDnode->status == DND_STAT_INIT) {
×
383
      code = TSDB_CODE_APP_IS_STARTING;
×
384
    } else {
385
      code = TSDB_CODE_APP_IS_STOPPING;
×
386
    }
387
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
388
           pMsg->info.handle);
389
    return code;
×
390
  } else {
391
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
645,308✔
392
  }
393
}
394

395
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
2,524,519✔
396

397
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
2,517,094✔
398
  (void)rpcReleaseHandle(pHandle, type, status);
2,517,094✔
399
}
2,517,109✔
400

401
static bool rpcRfp(int32_t code, tmsg_t msgType) {
61,342✔
402
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
61,342!
403
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
46,113✔
404
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
16,961✔
405
      code == TSDB_CODE_APP_IS_STOPPING) {
406
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
48,467!
407
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
48,484!
408
      return false;
×
409
    }
410
    return true;
48,484✔
411
  } else {
412
    return false;
12,875✔
413
  }
414
}
415
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
416
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_QUERY_SSMIGRATE_PROGRESS ||
×
417
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
418
    return true;
×
419
  }
420
  return false;
×
421
}
422
int32_t dmInitClient(SDnode *pDnode) {
2,380✔
423
  SDnodeTrans *pTrans = &pDnode->trans;
2,380✔
424

425
  SRpcInit rpcInit = {0};
2,380✔
426
  rpcInit.label = "DNODE-CLI";
2,380✔
427
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,380✔
428
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,380✔
429
  rpcInit.sessions = 1024;
2,380✔
430
  rpcInit.connType = TAOS_CONN_CLIENT;
2,380✔
431
  rpcInit.user = TSDB_DEFAULT_USER;
2,380✔
432
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,380✔
433
  rpcInit.parent = pDnode;
2,380✔
434
  rpcInit.rfp = rpcRfp;
2,380✔
435
  rpcInit.compressSize = tsCompressMsgSize;
2,380✔
436
  rpcInit.dfp = destroyAhandle;
2,380✔
437

438
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,380✔
439
  rpcInit.retryStepFactor = tsRedirectFactor;
2,380✔
440
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,380✔
441
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,380✔
442

443
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,380✔
444
  rpcInit.failFastThreshold = 3;    // failed threshold
2,380✔
445
  rpcInit.ffp = dmFailFastFp;
2,380✔
446

447
  rpcInit.noDelayFp = rpcNoDelayMsg;
2,380✔
448

449
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,380✔
450
  connLimitNum = TMAX(connLimitNum, 10);
2,380✔
451
  connLimitNum = TMIN(connLimitNum, 500);
2,380✔
452

453
  rpcInit.connLimitNum = connLimitNum;
2,380✔
454
  rpcInit.connLimitLock = 1;
2,380✔
455
  rpcInit.supportBatch = 1;
2,380✔
456
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,380✔
457
  rpcInit.shareConn = 1;
2,380✔
458
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,380✔
459
  rpcInit.notWaitAvaliableConn = 0;
2,380✔
460
  rpcInit.startReadTimer = 1;
2,380✔
461
  rpcInit.readTimeout = tsReadTimeout;
2,380✔
462
  rpcInit.ipv6 = tsEnableIpv6;
2,380✔
463

464
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,380!
465
    dError("failed to convert version string:%s to int", td_version);
×
466
  }
467

468
  pTrans->clientRpc = rpcOpen(&rpcInit);
2,380✔
469
  if (pTrans->clientRpc == NULL) {
2,380!
470
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
471
    return terrno;
×
472
  }
473

474
  dDebug("dnode rpc client is initialized");
2,380✔
475
  return 0;
2,380✔
476
}
477
int32_t dmInitStatusClient(SDnode *pDnode) {
2,380✔
478
  SDnodeTrans *pTrans = &pDnode->trans;
2,380✔
479

480
  SRpcInit rpcInit = {0};
2,380✔
481
  rpcInit.label = "DNODE-STA-CLI";
2,380✔
482
  rpcInit.numOfThreads = 1;
2,380✔
483
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,380✔
484
  rpcInit.sessions = 1024;
2,380✔
485
  rpcInit.connType = TAOS_CONN_CLIENT;
2,380✔
486
  rpcInit.user = TSDB_DEFAULT_USER;
2,380✔
487
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,380✔
488
  rpcInit.parent = pDnode;
2,380✔
489
  rpcInit.rfp = rpcRfp;
2,380✔
490
  rpcInit.compressSize = tsCompressMsgSize;
2,380✔
491

492
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,380✔
493
  rpcInit.retryStepFactor = tsRedirectFactor;
2,380✔
494
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,380✔
495
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,380✔
496

497
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,380✔
498
  rpcInit.failFastThreshold = 3;    // failed threshold
2,380✔
499
  rpcInit.ffp = dmFailFastFp;
2,380✔
500

501
  int32_t connLimitNum = 100;
2,380✔
502
  connLimitNum = TMAX(connLimitNum, 10);
2,380✔
503
  connLimitNum = TMIN(connLimitNum, 500);
2,380✔
504

505
  rpcInit.connLimitNum = connLimitNum;
2,380✔
506
  rpcInit.connLimitLock = 1;
2,380✔
507
  rpcInit.supportBatch = 1;
2,380✔
508
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
2,380✔
509
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,380✔
510
  rpcInit.startReadTimer = 0;
2,380✔
511
  rpcInit.readTimeout = 0;
2,380✔
512
  rpcInit.ipv6 = tsEnableIpv6;
2,380✔
513

514
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,380!
515
    dError("failed to convert version string:%s to int", td_version);
×
516
  }
517

518
  pTrans->statusRpc = rpcOpen(&rpcInit);
2,380✔
519
  if (pTrans->statusRpc == NULL) {
2,380!
520
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
521
    return terrno;
×
522
  }
523

524
  dDebug("dnode rpc status client is initialized");
2,380✔
525
  return 0;
2,380✔
526
}
527

528
int32_t dmInitSyncClient(SDnode *pDnode) {
2,380✔
529
  SDnodeTrans *pTrans = &pDnode->trans;
2,380✔
530

531
  SRpcInit rpcInit = {0};
2,380✔
532
  rpcInit.label = "DNODE-SYNC-CLI";
2,380✔
533
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
2,380✔
534
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,380✔
535
  rpcInit.sessions = 1024;
2,380✔
536
  rpcInit.connType = TAOS_CONN_CLIENT;
2,380✔
537
  rpcInit.user = TSDB_DEFAULT_USER;
2,380✔
538
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,380✔
539
  rpcInit.parent = pDnode;
2,380✔
540
  rpcInit.rfp = rpcRfp;
2,380✔
541
  rpcInit.compressSize = tsCompressMsgSize;
2,380✔
542

543
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,380✔
544
  rpcInit.retryStepFactor = tsRedirectFactor;
2,380✔
545
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,380✔
546
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,380✔
547

548
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
2,380✔
549
  rpcInit.failFastThreshold = 3;    // failed threshold
2,380✔
550
  rpcInit.ffp = dmFailFastFp;
2,380✔
551

552
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
2,380✔
553
  connLimitNum = TMAX(connLimitNum, 10);
2,380✔
554
  connLimitNum = TMIN(connLimitNum, 500);
2,380✔
555

556
  rpcInit.connLimitNum = connLimitNum;
2,380✔
557
  rpcInit.connLimitLock = 1;
2,380✔
558
  rpcInit.supportBatch = 1;
2,380✔
559
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
2,380✔
560
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,380✔
561
  rpcInit.startReadTimer = 1;
2,380✔
562
  rpcInit.readTimeout = tsReadTimeout;
2,380✔
563
  rpcInit.ipv6 = tsEnableIpv6;
2,380✔
564

565
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,380!
566
    dError("failed to convert version string:%s to int", td_version);
×
567
  }
568

569
  pTrans->syncRpc = rpcOpen(&rpcInit);
2,380✔
570
  if (pTrans->syncRpc == NULL) {
2,380!
571
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
572
    return terrno;
×
573
  }
574

575
  dDebug("dnode rpc sync client is initialized");
2,380✔
576
  return 0;
2,380✔
577
}
578

579
void dmCleanupClient(SDnode *pDnode) {
2,380✔
580
  SDnodeTrans *pTrans = &pDnode->trans;
2,380✔
581
  if (pTrans->clientRpc) {
2,380!
582
    rpcClose(pTrans->clientRpc);
2,380✔
583
    pTrans->clientRpc = NULL;
2,380✔
584
    dDebug("dnode rpc client is closed");
2,380✔
585
  }
586
}
2,380✔
587
void dmCleanupStatusClient(SDnode *pDnode) {
2,380✔
588
  SDnodeTrans *pTrans = &pDnode->trans;
2,380✔
589
  if (pTrans->statusRpc) {
2,380!
590
    rpcClose(pTrans->statusRpc);
2,380✔
591
    pTrans->statusRpc = NULL;
2,380✔
592
    dDebug("dnode rpc status client is closed");
2,380✔
593
  }
594
}
2,380✔
595
void dmCleanupSyncClient(SDnode *pDnode) {
2,380✔
596
  SDnodeTrans *pTrans = &pDnode->trans;
2,380✔
597
  if (pTrans->syncRpc) {
2,380!
598
    rpcClose(pTrans->syncRpc);
2,380✔
599
    pTrans->syncRpc = NULL;
2,380✔
600
    dDebug("dnode rpc sync client is closed");
2,380✔
601
  }
602
}
2,380✔
603

604
int32_t dmInitServer(SDnode *pDnode) {
2,381✔
605
  int32_t      code = 0;
2,381✔
606
  SDnodeTrans *pTrans = &pDnode->trans;
2,381✔
607

608
  SRpcInit rpcInit = {0};
2,381✔
609
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
2,381✔
610

611
  rpcInit.localPort = tsServerPort;
2,381✔
612
  rpcInit.label = "DND-S";
2,381✔
613
  rpcInit.numOfThreads = tsNumOfRpcThreads;
2,381✔
614
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
2,381✔
615
  rpcInit.sessions = tsMaxShellConns;
2,381✔
616
  rpcInit.connType = TAOS_CONN_SERVER;
2,381✔
617
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,381✔
618
  rpcInit.parent = pDnode;
2,381✔
619
  rpcInit.compressSize = tsCompressMsgSize;
2,381✔
620
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
2,381✔
621
  rpcInit.ipv6 = tsEnableIpv6;
2,381✔
622

623
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
2,381!
624
    dError("failed to convert version string:%s to int", td_version);
×
625
  }
626

627
  pTrans->serverRpc = rpcOpen(&rpcInit);
2,381✔
628
  if (pTrans->serverRpc == NULL) {
2,381✔
629
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
1!
630
    return terrno;
1✔
631
  }
632

633
  dDebug("dnode rpc server is initialized");
2,380✔
634
  return 0;
2,380✔
635
}
636

637
void dmCleanupServer(SDnode *pDnode) {
2,380✔
638
  SDnodeTrans *pTrans = &pDnode->trans;
2,380✔
639
  if (pTrans->serverRpc) {
2,380!
640
    rpcClose(pTrans->serverRpc);
2,380✔
641
    pTrans->serverRpc = NULL;
2,380✔
642
    dDebug("dnode rpc server is closed");
2,380✔
643
  }
644
}
2,380✔
645

646
SMsgCb dmGetMsgcb(SDnode *pDnode) {
23,403✔
647
  SMsgCb msgCb = {
23,403✔
648
      .clientRpc = pDnode->trans.clientRpc,
23,403✔
649
      .serverRpc = pDnode->trans.serverRpc,
23,403✔
650
      .statusRpc = pDnode->trans.statusRpc,
23,403✔
651
      .syncRpc = pDnode->trans.syncRpc,
23,403✔
652
      .sendReqFp = dmSendReq,
653
      .sendSyncReqFp = dmSendSyncReq,
654
      .sendRspFp = dmSendRsp,
655
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
656
      .releaseHandleFp = dmReleaseHandle,
657
      .reportStartupFp = dmReportStartup,
658
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
659
      .getDnodeEpFp = dmGetDnodeEp,
660
      .data = &pDnode->data,
23,403✔
661
  };
662
  return msgCb;
23,403✔
663
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc