• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.9
/source/dnode/mgmt/node_mgmt/src/dmTransport.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "qworker.h"
19
#include "tanalytics.h"
20
#include "tversion.h"
21

UNCOV
22
static inline void dmSendRsp(SRpcMsg *pMsg) {
×
UNCOV
23
  if (rpcSendResponse(pMsg) != 0) {
×
24
    dError("failed to send response, msg:%p", pMsg);
×
25
  }
UNCOV
26
}
×
27

UNCOV
28
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
×
UNCOV
29
  SEpSet epSet = {0};
×
UNCOV
30
  dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
×
31

UNCOV
32
  if (epSet.numOfEps <= 1) {
×
UNCOV
33
    if (epSet.numOfEps == 0) {
×
34
      pMsg->pCont = NULL;
×
35
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
36
      return;
×
37
    }
38
    // dnode is not the mnode or mnode leader  and This ensures that the function correctly handles cases where the
39
    // dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
UNCOV
40
    if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
×
41
      pMsg->pCont = NULL;
×
42
      pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
×
43
      return;
×
44
    }
45
  }
46

UNCOV
47
  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
×
UNCOV
48
  pMsg->pCont = rpcMallocCont(contLen);
×
UNCOV
49
  if (pMsg->pCont == NULL) {
×
50
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
51
  } else {
UNCOV
52
    contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
×
UNCOV
53
    if (contLen < 0) {
×
54
      pMsg->code = contLen;
×
55
      return;
×
56
    }
UNCOV
57
    pMsg->contLen = contLen;
×
58
  }
59
}
60

61
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
349✔
62
  const STraceId *trace = &pMsg->info.traceId;
349✔
63

64
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
349✔
65
  if (msgFp == NULL) {
349!
66
    // terrno = TSDB_CODE_MSG_NOT_PROCESSED;
67
    dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
×
68
    return TSDB_CODE_MSG_NOT_PROCESSED;
×
69
  }
70

71
  dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
349!
72
  pMsg->info.wrapper = pWrapper;
349✔
73
  return (*msgFp)(pWrapper->pMgmt, pMsg);
349✔
74
}
75

76
static bool dmFailFastFp(tmsg_t msgType) {
×
77
  // add more msg type later
78
  return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
×
79
}
80

UNCOV
81
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
×
UNCOV
82
  if (code != TSDB_CODE_APP_IS_STOPPING) {
×
UNCOV
83
    return code;
×
84
  }
UNCOV
85
  if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
×
UNCOV
86
      (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
×
UNCOV
87
    code = TSDB_CODE_VND_STOPPED;
×
88
  }
UNCOV
89
  return code;
×
90
}
UNCOV
91
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
UNCOV
92
  int32_t        code = 0;
×
UNCOV
93
  SUpdateIpWhite ipWhite = {0};  // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
×
UNCOV
94
  code = tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
×
UNCOV
95
  if (code < 0) {
×
96
    dError("failed to update rpc ip-white since: %s", tstrerror(code));
×
97
    return;
×
98
  }
UNCOV
99
  code = rpcSetIpWhite(pTrans, &ipWhite);
×
UNCOV
100
  pData->ipWhiteVer = ipWhite.ver;
×
101

UNCOV
102
  (void)tFreeSUpdateIpWhiteReq(&ipWhite);
×
103

UNCOV
104
  rpcFreeCont(pRpc->pCont);
×
105
}
106
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
349✔
107
  if (forbidden) {
349!
108
    SIpV4Range range = {.ip = clientIp, .mask = 32};
×
109
    char       buf[36] = {0};
×
110

111
    (void)rpcUtilSIpRangeToStr(&range, buf);
×
112
    dError("User:%s host:%s not in ip white list", user, buf);
×
113
    return true;
×
114
  } else {
115
    return false;
349✔
116
  }
117
}
118

UNCOV
119
static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
×
UNCOV
120
  SRetrieveAnalAlgoRsp rsp = {0};
×
UNCOV
121
  if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
×
UNCOV
122
    taosAnalyUpdate(rsp.ver, rsp.hash);
×
UNCOV
123
    rsp.hash = NULL;
×
124
  }
UNCOV
125
  tFreeRetrieveAnalAlgoRsp(&rsp);
×
UNCOV
126
  rpcFreeCont(pRpc->pCont);
×
UNCOV
127
}
×
128

129
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
349✔
130
  SDnodeTrans  *pTrans = &pDnode->trans;
349✔
131
  int32_t       code = -1;
349✔
132
  SRpcMsg      *pMsg = NULL;
349✔
133
  SMgmtWrapper *pWrapper = NULL;
349✔
134
  SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
349✔
135

136
  const STraceId *trace = &pRpc->info.traceId;
349✔
137
  dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
349!
138
          pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
139

140
  int32_t svrVer = 0;
349✔
141
  code = taosVersionStrToInt(td_version, &svrVer);
349✔
142
  if (code != 0) {
349!
143
    dError("failed to convert version string:%s to int, code:%d", td_version, code);
×
144
    goto _OVER;
×
145
  }
146
  if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
349!
147
    dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
×
148
           pRpc->info.conn.clientIp);
149
    goto _OVER;
×
150
  }
151

152
  bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
349✔
153
  if (isForbidden) {
349!
154
    code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
×
155
    goto _OVER;
×
156
  }
157

158
  switch (pRpc->msgType) {
349!
159
    case TDMT_DND_NET_TEST:
×
160
      dmProcessNetTestReq(pDnode, pRpc);
×
UNCOV
161
      return;
×
UNCOV
162
    case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
×
163
    case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
164
    case TDMT_SCH_FETCH_RSP:
165
    case TDMT_SCH_MERGE_FETCH_RSP:
166
    case TDMT_VND_SUBMIT_RSP:
UNCOV
167
      code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
×
UNCOV
168
      return;
×
169
    case TDMT_MND_STATUS_RSP:
×
170
      if (pEpSet != NULL) {
×
171
        dmSetMnodeEpSet(&pDnode->data, pEpSet);
×
172
      }
173
      break;
×
UNCOV
174
    case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
×
UNCOV
175
      dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
×
UNCOV
176
      return;
×
UNCOV
177
    case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
×
UNCOV
178
      dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc);
×
UNCOV
179
      return;
×
180
    default:
349✔
181
      break;
349✔
182
  }
183

184
  /*
185
  pDnode is null, TD-22618
186
  at trans.c line 91
187
  before this line, dmProcessRpcMsg callback is set
188
  after this line, parent is set
189
  so when dmProcessRpcMsg is called, pDonde is still null.
190
  */
191
  if (pDnode != NULL) {
349!
192
    if (pDnode->status != DND_STAT_RUNNING) {
349!
UNCOV
193
      if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
×
194
        dmProcessServerStartupStatus(pDnode, pRpc);
×
195
        return;
×
196
      } else {
UNCOV
197
        if (pDnode->status == DND_STAT_INIT) {
×
UNCOV
198
          code = TSDB_CODE_APP_IS_STARTING;
×
199
        } else {
UNCOV
200
          code = TSDB_CODE_APP_IS_STOPPING;
×
201
        }
UNCOV
202
        goto _OVER;
×
203
      }
204
    }
205
  } else {
UNCOV
206
    code = TSDB_CODE_APP_IS_STARTING;
×
UNCOV
207
    goto _OVER;
×
208
  }
209

210
  if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
349!
211
    dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
×
212
    code = TSDB_CODE_INVALID_MSG_LEN;
×
213
    goto _OVER;
×
214
  } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
349!
UNCOV
215
             (!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
×
UNCOV
216
    dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
×
217
  }
218

219
  if (pHandle->defaultNtype == NODE_END) {
349!
220
    dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
×
221
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
222
    goto _OVER;
×
223
  }
224

225
  pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
349✔
226
  if (pHandle->needCheckVgId) {
349✔
227
    if (pRpc->contLen > 0) {
16!
228
      const SMsgHead *pHead = pRpc->pCont;
16✔
229
      const int32_t   vgId = ntohl(pHead->vgId);
16✔
230
      switch (vgId) {
16!
UNCOV
231
        case QNODE_HANDLE:
×
UNCOV
232
          pWrapper = &pDnode->wrappers[QNODE];
×
UNCOV
233
          break;
×
UNCOV
234
        case SNODE_HANDLE:
×
UNCOV
235
          pWrapper = &pDnode->wrappers[SNODE];
×
UNCOV
236
          break;
×
UNCOV
237
        case MNODE_HANDLE:
×
UNCOV
238
          pWrapper = &pDnode->wrappers[MNODE];
×
UNCOV
239
          break;
×
240
        default:
16✔
241
          break;
16✔
242
      }
243
    } else {
244
      dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
×
245
      code = TSDB_CODE_INVALID_MSG_LEN;
×
246
      goto _OVER;
×
247
    }
248
  }
249

250
  if ((code = dmMarkWrapper(pWrapper)) != 0) {
349!
UNCOV
251
    pWrapper = NULL;
×
UNCOV
252
    goto _OVER;
×
253
  }
254

255
  pRpc->info.wrapper = pWrapper;
349✔
256

257
  EQItype itype = RPC_QITEM;  // rsp msg is not restricted by tsQueueMemoryUsed
349✔
258
  if (IsReq(pRpc) && pRpc->msgType != TDMT_SYNC_HEARTBEAT && pRpc->msgType != TDMT_SYNC_HEARTBEAT_REPLY)
349!
259
    itype = RPC_QITEM;
259✔
260
  code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
349✔
261
  if (code) goto _OVER;
349!
262

263
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
349✔
264
  dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
349!
265
          pRpc->contLen);
266

267
  code = dmProcessNodeMsg(pWrapper, pMsg);
349✔
268

269
_OVER:
349✔
270
  if (code != 0) {
349!
UNCOV
271
    code = dmConvertErrCode(pRpc->msgType, code);
×
UNCOV
272
    if (pMsg) {
×
UNCOV
273
      dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
×
274
    } else {
UNCOV
275
      dGTrace("msg:%p, failed to process empty msg since %s", pMsg, tstrerror(code));
×
276
    }
277

UNCOV
278
    if (IsReq(pRpc)) {
×
UNCOV
279
      SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
×
UNCOV
280
      if (code == TSDB_CODE_MNODE_NOT_FOUND) {
×
UNCOV
281
        dmBuildMnodeRedirectRsp(pDnode, &rsp);
×
282
      }
283

UNCOV
284
      if (pWrapper != NULL) {
×
UNCOV
285
        dmSendRsp(&rsp);
×
286
      } else {
UNCOV
287
        if (rpcSendResponse(&rsp) != 0) {
×
288
          dError("failed to send response, msg:%p", &rsp);
×
289
        }
290
      }
291
    }
292

UNCOV
293
    if (pMsg != NULL) {
×
UNCOV
294
      dGTrace("msg:%p, is freed", pMsg);
×
UNCOV
295
      taosFreeQitem(pMsg);
×
296
    }
UNCOV
297
    rpcFreeCont(pRpc->pCont);
×
UNCOV
298
    pRpc->pCont = NULL;
×
299
  }
300

301
  dmReleaseWrapper(pWrapper);
349✔
302
}
303

304
int32_t dmInitMsgHandle(SDnode *pDnode) {
8✔
305
  SDnodeTrans *pTrans = &pDnode->trans;
8✔
306

307
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
48✔
308
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
40✔
309
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
40✔
310
    if (pArray == NULL) return -1;
40!
311

312
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
2,784✔
313
      SMgmtHandle  *pMgmt = taosArrayGet(pArray, i);
2,744✔
314
      SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
2,744✔
315
      if (pMgmt->needCheckVgId) {
2,744✔
316
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
472✔
317
      }
318
      if (!pMgmt->needCheckVgId) {
2,744✔
319
        pHandle->defaultNtype = ntype;
2,272✔
320
      }
321
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
2,744✔
322
    }
323

324
    taosArrayDestroy(pArray);
40✔
325
  }
326

327
  return 0;
8✔
328
}
329

330
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
90✔
331
  int32_t code = 0;
90✔
332
  SDnode *pDnode = dmInstance();
90✔
333
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
90!
UNCOV
334
    rpcFreeCont(pMsg->pCont);
×
UNCOV
335
    pMsg->pCont = NULL;
×
UNCOV
336
    if (pDnode->status == DND_STAT_INIT) {
×
337
      code = TSDB_CODE_APP_IS_STARTING;
×
338
    } else {
UNCOV
339
      code = TSDB_CODE_APP_IS_STOPPING;
×
340
    }
UNCOV
341
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
342
           pMsg->info.handle);
UNCOV
343
    return code;
×
344
  } else {
345
    pMsg->info.handle = 0;
90✔
346
    if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
90!
UNCOV
347
      dError("failed to send rpc msg");
×
348
    }
349
    return 0;
90✔
350
  }
351
}
UNCOV
352
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
×
UNCOV
353
  int32_t code = 0;
×
UNCOV
354
  SDnode *pDnode = dmInstance();
×
UNCOV
355
  if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
×
356
    rpcFreeCont(pMsg->pCont);
×
357
    pMsg->pCont = NULL;
×
358
    if (pDnode->status == DND_STAT_INIT) {
×
359
      code = TSDB_CODE_APP_IS_STARTING;
×
360
    } else {
361
      code = TSDB_CODE_APP_IS_STOPPING;
×
362
    }
363
    dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
×
364
           pMsg->info.handle);
365
    return code;
×
366
  } else {
UNCOV
367
    return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
×
368
  }
369
}
370

UNCOV
371
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
×
372

UNCOV
373
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
×
UNCOV
374
  (void)rpcReleaseHandle(pHandle, type, status);
×
UNCOV
375
}
×
376

UNCOV
377
static bool rpcRfp(int32_t code, tmsg_t msgType) {
×
UNCOV
378
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
×
UNCOV
379
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
×
UNCOV
380
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
×
381
      code == TSDB_CODE_APP_IS_STOPPING) {
UNCOV
382
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
×
UNCOV
383
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
384
      return false;
×
385
    }
UNCOV
386
    return true;
×
387
  } else {
UNCOV
388
    return false;
×
389
  }
390
}
391
static bool rpcNoDelayMsg(tmsg_t msgType) {
×
392
  if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
×
393
      msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
×
394
    return true;
×
395
  }
396
  return false;
×
397
}
398
int32_t dmInitClient(SDnode *pDnode) {
8✔
399
  SDnodeTrans *pTrans = &pDnode->trans;
8✔
400

401
  SRpcInit rpcInit = {0};
8✔
402
  rpcInit.label = "DNODE-CLI";
8✔
403
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
8✔
404
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
8✔
405
  rpcInit.sessions = 1024;
8✔
406
  rpcInit.connType = TAOS_CONN_CLIENT;
8✔
407
  rpcInit.user = TSDB_DEFAULT_USER;
8✔
408
  rpcInit.idleTime = tsShellActivityTimer * 1000;
8✔
409
  rpcInit.parent = pDnode;
8✔
410
  rpcInit.rfp = rpcRfp;
8✔
411
  rpcInit.compressSize = tsCompressMsgSize;
8✔
412
  rpcInit.dfp = destroyAhandle;
8✔
413

414
  rpcInit.retryMinInterval = tsRedirectPeriod;
8✔
415
  rpcInit.retryStepFactor = tsRedirectFactor;
8✔
416
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
8✔
417
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
8✔
418

419
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
8✔
420
  rpcInit.failFastThreshold = 3;    // failed threshold
8✔
421
  rpcInit.ffp = dmFailFastFp;
8✔
422

423
  rpcInit.noDelayFp = rpcNoDelayMsg;
8✔
424

425
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
8✔
426
  connLimitNum = TMAX(connLimitNum, 10);
8✔
427
  connLimitNum = TMIN(connLimitNum, 500);
8✔
428

429
  rpcInit.connLimitNum = connLimitNum;
8✔
430
  rpcInit.connLimitLock = 1;
8✔
431
  rpcInit.supportBatch = 1;
8✔
432
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
8✔
433
  rpcInit.shareConn = 1;
8✔
434
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
8✔
435
  rpcInit.notWaitAvaliableConn = 0;
8✔
436
  rpcInit.startReadTimer = 1;
8✔
437
  rpcInit.readTimeout = tsReadTimeout;
8✔
438

439
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
8!
440
    dError("failed to convert version string:%s to int", td_version);
×
441
  }
442

443
  pTrans->clientRpc = rpcOpen(&rpcInit);
8✔
444
  if (pTrans->clientRpc == NULL) {
8!
445
    dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
×
446
    return terrno;
×
447
  }
448

449
  dDebug("dnode rpc client is initialized");
8!
450
  return 0;
8✔
451
}
452
int32_t dmInitStatusClient(SDnode *pDnode) {
8✔
453
  SDnodeTrans *pTrans = &pDnode->trans;
8✔
454

455
  SRpcInit rpcInit = {0};
8✔
456
  rpcInit.label = "DNODE-STA-CLI";
8✔
457
  rpcInit.numOfThreads = 1;
8✔
458
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
8✔
459
  rpcInit.sessions = 1024;
8✔
460
  rpcInit.connType = TAOS_CONN_CLIENT;
8✔
461
  rpcInit.user = TSDB_DEFAULT_USER;
8✔
462
  rpcInit.idleTime = tsShellActivityTimer * 1000;
8✔
463
  rpcInit.parent = pDnode;
8✔
464
  rpcInit.rfp = rpcRfp;
8✔
465
  rpcInit.compressSize = tsCompressMsgSize;
8✔
466

467
  rpcInit.retryMinInterval = tsRedirectPeriod;
8✔
468
  rpcInit.retryStepFactor = tsRedirectFactor;
8✔
469
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
8✔
470
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
8✔
471

472
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
8✔
473
  rpcInit.failFastThreshold = 3;    // failed threshold
8✔
474
  rpcInit.ffp = dmFailFastFp;
8✔
475

476
  int32_t connLimitNum = 100;
8✔
477
  connLimitNum = TMAX(connLimitNum, 10);
8✔
478
  connLimitNum = TMIN(connLimitNum, 500);
8✔
479

480
  rpcInit.connLimitNum = connLimitNum;
8✔
481
  rpcInit.connLimitLock = 1;
8✔
482
  rpcInit.supportBatch = 1;
8✔
483
  rpcInit.shareConnLimit = tsShareConnLimit * 2;
8✔
484
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
8✔
485
  rpcInit.startReadTimer = 0;
8✔
486
  rpcInit.readTimeout = 0;
8✔
487

488
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
8!
489
    dError("failed to convert version string:%s to int", td_version);
×
490
  }
491

492
  pTrans->statusRpc = rpcOpen(&rpcInit);
8✔
493
  if (pTrans->statusRpc == NULL) {
8!
494
    dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
×
495
    return terrno;
×
496
  }
497

498
  dDebug("dnode rpc status client is initialized");
8!
499
  return 0;
8✔
500
}
501

502
int32_t dmInitSyncClient(SDnode *pDnode) {
8✔
503
  SDnodeTrans *pTrans = &pDnode->trans;
8✔
504

505
  SRpcInit rpcInit = {0};
8✔
506
  rpcInit.label = "DNODE-SYNC-CLI";
8✔
507
  rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
8✔
508
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
8✔
509
  rpcInit.sessions = 1024;
8✔
510
  rpcInit.connType = TAOS_CONN_CLIENT;
8✔
511
  rpcInit.user = TSDB_DEFAULT_USER;
8✔
512
  rpcInit.idleTime = tsShellActivityTimer * 1000;
8✔
513
  rpcInit.parent = pDnode;
8✔
514
  rpcInit.rfp = rpcRfp;
8✔
515
  rpcInit.compressSize = tsCompressMsgSize;
8✔
516

517
  rpcInit.retryMinInterval = tsRedirectPeriod;
8✔
518
  rpcInit.retryStepFactor = tsRedirectFactor;
8✔
519
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
8✔
520
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
8✔
521

522
  rpcInit.failFastInterval = 5000;  // interval threshold(ms)
8✔
523
  rpcInit.failFastThreshold = 3;    // failed threshold
8✔
524
  rpcInit.ffp = dmFailFastFp;
8✔
525

526
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
8✔
527
  connLimitNum = TMAX(connLimitNum, 10);
8✔
528
  connLimitNum = TMIN(connLimitNum, 500);
8✔
529

530
  rpcInit.connLimitNum = connLimitNum;
8✔
531
  rpcInit.connLimitLock = 1;
8✔
532
  rpcInit.supportBatch = 1;
8✔
533
  rpcInit.shareConnLimit = tsShareConnLimit * 8;
8✔
534
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
8✔
535
  rpcInit.startReadTimer = 1;
8✔
536
  rpcInit.readTimeout = tsReadTimeout;
8✔
537

538
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
8!
539
    dError("failed to convert version string:%s to int", td_version);
×
540
  }
541

542
  pTrans->syncRpc = rpcOpen(&rpcInit);
8✔
543
  if (pTrans->syncRpc == NULL) {
8!
544
    dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));
×
545
    return terrno;
×
546
  }
547

548
  dDebug("dnode rpc sync client is initialized");
8!
549
  return 0;
8✔
550
}
551

552
void dmCleanupClient(SDnode *pDnode) {
8✔
553
  SDnodeTrans *pTrans = &pDnode->trans;
8✔
554
  if (pTrans->clientRpc) {
8!
555
    rpcClose(pTrans->clientRpc);
8✔
556
    pTrans->clientRpc = NULL;
8✔
557
    dDebug("dnode rpc client is closed");
8!
558
  }
559
}
8✔
560
void dmCleanupStatusClient(SDnode *pDnode) {
8✔
561
  SDnodeTrans *pTrans = &pDnode->trans;
8✔
562
  if (pTrans->statusRpc) {
8!
563
    rpcClose(pTrans->statusRpc);
8✔
564
    pTrans->statusRpc = NULL;
8✔
565
    dDebug("dnode rpc status client is closed");
8!
566
  }
567
}
8✔
568
void dmCleanupSyncClient(SDnode *pDnode) {
8✔
569
  SDnodeTrans *pTrans = &pDnode->trans;
8✔
570
  if (pTrans->syncRpc) {
8!
571
    rpcClose(pTrans->syncRpc);
8✔
572
    pTrans->syncRpc = NULL;
8✔
573
    dDebug("dnode rpc sync client is closed");
8!
574
  }
575
}
8✔
576

577
int32_t dmInitServer(SDnode *pDnode) {
8✔
578
  SDnodeTrans *pTrans = &pDnode->trans;
8✔
579

580
  SRpcInit rpcInit = {0};
8✔
581
  tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
8✔
582
  rpcInit.localPort = tsServerPort;
8✔
583
  rpcInit.label = "DND-S";
8✔
584
  rpcInit.numOfThreads = tsNumOfRpcThreads;
8✔
585
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
8✔
586
  rpcInit.sessions = tsMaxShellConns;
8✔
587
  rpcInit.connType = TAOS_CONN_SERVER;
8✔
588
  rpcInit.idleTime = tsShellActivityTimer * 1000;
8✔
589
  rpcInit.parent = pDnode;
8✔
590
  rpcInit.compressSize = tsCompressMsgSize;
8✔
591
  rpcInit.shareConnLimit = tsShareConnLimit * 16;
8✔
592

593
  if (taosVersionStrToInt(td_version, &rpcInit.compatibilityVer) != 0) {
8!
594
    dError("failed to convert version string:%s to int", td_version);
×
595
  }
596

597
  pTrans->serverRpc = rpcOpen(&rpcInit);
8✔
598
  if (pTrans->serverRpc == NULL) {
8!
UNCOV
599
    dError("failed to init dnode rpc server since:%s", tstrerror(terrno));
×
UNCOV
600
    return terrno;
×
601
  }
602

603
  dDebug("dnode rpc server is initialized");
8!
604
  return 0;
8✔
605
}
606

607
void dmCleanupServer(SDnode *pDnode) {
8✔
608
  SDnodeTrans *pTrans = &pDnode->trans;
8✔
609
  if (pTrans->serverRpc) {
8!
610
    rpcClose(pTrans->serverRpc);
8✔
611
    pTrans->serverRpc = NULL;
8✔
612
    dDebug("dnode rpc server is closed");
8!
613
  }
614
}
8✔
615

616
SMsgCb dmGetMsgcb(SDnode *pDnode) {
70✔
617
  SMsgCb msgCb = {
70✔
618
      .clientRpc = pDnode->trans.clientRpc,
70✔
619
      .serverRpc = pDnode->trans.serverRpc,
70✔
620
      .statusRpc = pDnode->trans.statusRpc,
70✔
621
      .syncRpc = pDnode->trans.syncRpc,
70✔
622
      .sendReqFp = dmSendReq,
623
      .sendSyncReqFp = dmSendSyncReq,
624
      .sendRspFp = dmSendRsp,
625
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
626
      .releaseHandleFp = dmReleaseHandle,
627
      .reportStartupFp = dmReportStartup,
628
      .updateDnodeInfoFp = dmUpdateDnodeInfo,
629
      .getDnodeEpFp = dmGetDnodeEp,
630
      .data = &pDnode->data,
70✔
631
  };
632
  return msgCb;
70✔
633
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc