• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.96
/source/libs/transport/src/trans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "transComm.h"
17

18
#ifndef TD_ASTRA_RPC
19
void* (*taosInitHandle[])(SIpAddr* addr, char* label, int32_t numOfThreads, void* fp, void* pInit) = {transInitServer,
20
                                                                                                      transInitClient};
21

22
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
23

24
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
25
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, NULL};
26

27
int (*transReleaseHandle[])(void* handle, int32_t status) = {transReleaseSrvHandle, transReleaseCliHandle};
28

29
static int32_t transValidLocalFqdn(const char* localFqdn, SIpAddr* addr) {
2,388✔
30
  int32_t code = taosGetIpFromFqdn(tsEnableIpv6, localFqdn, addr);
2,388✔
31
  if (code != 0) {
2,388!
32
    return TSDB_CODE_RPC_FQDN_ERROR;
×
33
  }
34
  return 0;
2,388✔
35
}
36
void* rpcOpen(const SRpcInit* pInit) {
14,071✔
37
  int32_t code = rpcInit();
14,071✔
38
  if (code != 0) {
14,071!
39
    TAOS_CHECK_GOTO(code, NULL, _end);
×
40
  }
41

42
  SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo));
14,071!
43
  if (pRpc == NULL) {
14,071!
44
    TAOS_CHECK_GOTO(terrno, NULL, _end);
×
45
  }
46

47
  pRpc->startReadTimer = pInit->startReadTimer;
14,071✔
48
  if (pInit->label) {
14,071!
49
    int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label);
14,071!
50
    memcpy(pRpc->label, pInit->label, len);
14,071✔
51
  }
52

53
  pRpc->compressSize = pInit->compressSize;
14,071✔
54
  if (pRpc->compressSize < 0) {
14,071✔
55
    pRpc->compressSize = -1;
13,962✔
56
  }
57

58
  pRpc->encryption = pInit->encryption;
14,071✔
59
  pRpc->compatibilityVer = pInit->compatibilityVer;
14,071✔
60

61
  pRpc->retryMinInterval = pInit->retryMinInterval;  // retry init interval
14,071✔
62
  pRpc->retryStepFactor = pInit->retryStepFactor;
14,071✔
63
  pRpc->retryMaxInterval = pInit->retryMaxInterval;
14,071✔
64
  pRpc->retryMaxTimeout = pInit->retryMaxTimeout;
14,071✔
65

66
  pRpc->failFastThreshold = pInit->failFastThreshold;
14,071✔
67
  pRpc->failFastInterval = pInit->failFastInterval;
14,071✔
68

69
  // register callback handle
70
  pRpc->cfp = pInit->cfp;
14,071✔
71
  pRpc->retry = pInit->rfp;
14,071✔
72
  pRpc->startTimer = pInit->tfp;
14,071✔
73
  pRpc->destroyFp = pInit->dfp;
14,071✔
74
  pRpc->failFastFp = pInit->ffp;
14,071✔
75
  pRpc->noDelayFp = pInit->noDelayFp;
14,071✔
76
  pRpc->connLimitNum = pInit->connLimitNum;
14,071✔
77
  if (pRpc->connLimitNum == 0) {
14,071✔
78
    pRpc->connLimitNum = 20;
2,428✔
79
  }
80

81
  pRpc->connLimitLock = pInit->connLimitLock;
14,071✔
82
  pRpc->supportBatch = pInit->supportBatch;
14,071✔
83
  pRpc->shareConnLimit = pInit->shareConnLimit;
14,071✔
84
  if (pRpc->shareConnLimit <= 0) {
14,071✔
85
    pRpc->shareConnLimit = BUFFER_LIMIT;
2,448✔
86
  }
87

88
  pRpc->readTimeout = pInit->readTimeout;
14,071✔
89
  if (pRpc->readTimeout < 0) {
14,071!
90
    pRpc->readTimeout = INT64_MAX;
×
91
  }
92

93
  pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
14,071✔
94
  if (pRpc->numOfThreads <= 0) {
14,071✔
95
    pRpc->numOfThreads = 1;
48✔
96
  }
97

98
  SIpAddr addr = {0};
14,071✔
99
  if (pInit->connType == TAOS_CONN_SERVER) {
14,071✔
100
    if ((code = transValidLocalFqdn(pInit->localFqdn, &addr)) != 0) {
2,388!
101
      tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, tstrerror(code));
×
102
      TAOS_CHECK_GOTO(code, NULL, _end);
×
103
    }
104
  }
105
  addr.port = pInit->localPort;
14,071✔
106

107
  pRpc->connType = pInit->connType;
14,071✔
108
  pRpc->idleTime = pInit->idleTime;
14,071✔
109
  pRpc->parent = pInit->parent;
14,071✔
110
  if (pInit->user) {
14,071✔
111
    tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
11,683✔
112
  }
113
  pRpc->timeToGetConn = pInit->timeToGetConn;
14,071✔
114
  if (pRpc->timeToGetConn == 0) {
14,071✔
115
    pRpc->timeToGetConn = 10 * 1000;
2,427✔
116
  }
117
  pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn;
14,071✔
118
  pRpc->ipv6 = pInit->ipv6;
14,071✔
119

120
  pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(&addr, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
14,071✔
121

122
  if (pRpc->tcphandle == NULL) {
14,071!
123
    tError("failed to init rpc handle");
×
124
    TAOS_CHECK_GOTO(terrno, NULL, _end);
×
125
  }
126

127
  int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
14,071✔
128
  void*   tmp = transAcquireExHandle(transGetInstMgt(), refId);
14,071✔
129
  pRpc->refId = refId;
14,071✔
130

131
  pRpc->shareConn = pInit->shareConn;
14,071✔
132
  return (void*)refId;
14,071✔
133
_end:
×
134
  taosMemoryFree(pRpc);
×
135
  terrno = code;
×
136

137
  return NULL;
×
138
}
139
void rpcClose(void* arg) {
14,071✔
140
  tInfo("start to close rpc");
14,071!
141
  if (arg == NULL) {
14,071!
142
    return;
×
143
  }
144
  transRemoveExHandle(transGetInstMgt(), (int64_t)arg);
14,071✔
145
  transReleaseExHandle(transGetInstMgt(), (int64_t)arg);
14,071✔
146
  tInfo("end to close rpc");
14,071!
147
  return;
14,071✔
148
}
149
void rpcCloseImpl(void* arg) {
14,071✔
150
  if (arg == NULL) return;
14,071!
151
  SRpcInfo* pRpc = (SRpcInfo*)arg;
14,071✔
152
  if (pRpc->tcphandle != NULL) {
14,071!
153
    (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
14,071✔
154
  }
155
  taosMemoryFree(pRpc);
14,071!
156
}
157

158
void* rpcMallocCont(int64_t contLen) {
17,858,166✔
159
  int64_t size = contLen + TRANS_MSG_OVERHEAD;
17,858,166✔
160
  char*   start = taosMemoryCalloc(1, size);
17,858,166!
161
  if (start == NULL) {
17,859,949!
162
    tError("failed to malloc msg, size:%" PRId64, size);
×
163
    return NULL;
×
164
  } else {
165
    tTrace("cont:%p, rpc malloc size:%" PRId64, start, size);
17,859,949✔
166
  }
167

168
  return start + TRANS_MSG_OVERHEAD;
17,860,045✔
169
}
170

171
void rpcFreeCont(void* cont) { transFreeMsg(cont); }
12,954,073✔
172

173
void* rpcReallocCont(void* ptr, int64_t contLen) {
3,367,235✔
174
  if (ptr == NULL) return rpcMallocCont(contLen);
3,367,235✔
175

176
  char*   st = (char*)ptr - TRANS_MSG_OVERHEAD;
1,701,253✔
177
  int64_t sz = contLen + TRANS_MSG_OVERHEAD;
1,701,253✔
178
  char*   nst = taosMemoryRealloc(st, sz);
1,701,253!
179
  if (nst == NULL) {
1,701,082!
180
    taosMemoryFree(st);
×
181
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
182
    return NULL;
×
183
  } else {
184
    st = nst;
1,701,082✔
185
  }
186

187
  return st + TRANS_MSG_OVERHEAD;
1,701,082✔
188
}
189

190
int32_t rpcSendRequest(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
984,820✔
191
  return transSendRequest(pInit, pEpSet, pMsg, NULL);
984,820✔
192
}
193
int32_t rpcSendRequestWithCtx(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
1,194,244✔
194
  if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
1,194,244!
195
    return transSendRequest(pInit, pEpSet, pMsg, pCtx);
295,691✔
196
  } else {
197
    return transSendRequestWithId(pInit, pEpSet, pMsg, pRid);
898,553✔
198
  }
199
}
200

201
int32_t rpcSendRequestWithId(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
×
202
  return transSendRequestWithId(pInit, pEpSet, pReq, transpointId);
×
203
}
204

205
int32_t rpcSendRecv(void* pInit, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
2,205✔
206
  return transSendRecv(pInit, pEpSet, pMsg, pRsp);
2,205✔
207
}
208
int32_t rpcSendRecvWithTimeout(void* pInit, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
101,891✔
209
                               int32_t timeoutMs) {
210
  return transSendRecvWithTimeout(pInit, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
101,891✔
211
}
212
int32_t rpcFreeConnById(void* pInit, int64_t connId) { return transFreeConnById(pInit, connId); }
898,463✔
213

214
int32_t rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
7,051,396✔
215

216
void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); }
×
217

218
void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); }
×
219

220
int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
2,552,350✔
221
int32_t rpcReleaseHandle(void* handle, int8_t type, int32_t status) {
2,568,159✔
222
  return (*transReleaseHandle[type])(handle, status);
2,568,159✔
223
}
224

225
// client only
226
int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
4,936✔
227
  // later
228
  return transSetDefaultAddr(thandle, ip, fqdn);
4,936✔
229
}
230
// server only
231
int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); }
9,542✔
232

233
int32_t rpcAllocHandle(int64_t* refId) { return transAllocHandle(refId); }
59,051✔
234

235
int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); }
×
236
int32_t rpcUtilSWhiteListToStr(SIpWhiteListDual* pWhiteList, char** ppBuf) {
×
237
  return transUtilSWhiteListToStr(pWhiteList, ppBuf);
×
238
}
239

240
int32_t rpcCvtErrCode(int32_t code) {
27✔
241
  if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
27!
242
    return TSDB_CODE_RPC_NETWORK_ERROR;
×
243
  }
244
  return code;
27✔
245
}
246

247
int32_t rpcInit() { return transInit(); }
16,110✔
248

249
void rpcCleanup(void) {
4,424✔
250
  transCleanup();
4,424✔
251
  transHttpEnvDestroy();
4,424✔
252

253
  return;
4,424✔
254
}
255
#else
256
#ifdef TD_ASTRA_RPC
257
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = {
258
    transInitServer, transInitClient};
259
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
260
int (*transReleaseHandle[])(void* handle, int32_t status) = {transReleaseSrvHandle, transReleaseCliHandle};
261
#else
262
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = {
263
    transInitServer, transInitClient};
264
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
265
int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
266
#endif
267

268
static int32_t transValidLocalFqdn(const char* localFqdn, SIpAddr* ip) { return 0; }
269
typedef struct {
270
  char*    lablset;
271
  RPC_TYPE type;
272
} SLableSet;
273
static SLableSet labelSet[] = {
274
    {"TSC", TD_ASTRA_CLIENT | TD_ASTRA_DSVR},
275
    {"DNODE-CLI", TD_ASTRA_DSVR_CLIENT | TD_ASTRA_DSVR},
276
    {"DNODE-STA-CLI", TD_ASTRA_DSVR_STA_CLIENT | TD_ASTRA_DSVR},
277
    {"DNODE-SYNC-CLI", TD_ASTRA_DSVR_SYNC_CLIENT | TD_ASTRA_DSVR},
278
    {"DND-S", TD_ASTRA_DSVR},
279
};
280

281
void* rpcOpen(const SRpcInit* pInit) {
282
  int32_t code = rpcInit();
283
  if (code != 0) {
284
    TAOS_CHECK_GOTO(code, NULL, _end);
285
  }
286

287
  SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo));
288
  if (pRpc == NULL) {
289
    TAOS_CHECK_GOTO(terrno, NULL, _end);
290
  }
291
  if (pInit->label) {
292
    int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label);
293
    memcpy(pRpc->label, pInit->label, len);
294
  }
295

296
  pRpc->compressSize = pInit->compressSize;
297
  if (pRpc->compressSize < 0) {
298
    pRpc->compressSize = -1;
299
  }
300

301
  pRpc->encryption = pInit->encryption;
302
  pRpc->compatibilityVer = pInit->compatibilityVer;
303

304
  pRpc->retryMinInterval = pInit->retryMinInterval;  // retry init interval
305
  pRpc->retryStepFactor = pInit->retryStepFactor;
306
  pRpc->retryMaxInterval = pInit->retryMaxInterval;
307
  pRpc->retryMaxTimeout = pInit->retryMaxTimeout;
308

309
  pRpc->failFastThreshold = pInit->failFastThreshold;
310
  pRpc->failFastInterval = pInit->failFastInterval;
311

312
  // register callback handle
313
  pRpc->cfp = pInit->cfp;
314
  pRpc->retry = pInit->rfp;
315
  pRpc->startTimer = pInit->tfp;
316
  pRpc->destroyFp = pInit->dfp;
317
  pRpc->failFastFp = pInit->ffp;
318
  pRpc->noDelayFp = pInit->noDelayFp;
319
  pRpc->connLimitNum = pInit->connLimitNum;
320
  if (pRpc->connLimitNum == 0) {
321
    pRpc->connLimitNum = 20;
322
  }
323

324
  pRpc->connLimitLock = pInit->connLimitLock;
325
  pRpc->supportBatch = pInit->supportBatch;
326
  pRpc->batchSize = pInit->batchSize;
327

328
  pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
329
  if (pRpc->numOfThreads <= 0) {
330
    pRpc->numOfThreads = 1;
331
  }
332

333
  if (pInit->connType == TAOS_CONN_SERVER) {
334
    SIpAddr addr = {0};
335
    if ((code = transValidLocalFqdn(pInit->localFqdn, &ip)) != 0) {
336
      tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, tstrerror(code));
337
      TAOS_CHECK_GOTO(code, NULL, _end);
338
    }
339
  }
340

341
  pRpc->connType = pInit->connType;
342
  pRpc->idleTime = pInit->idleTime;
343
  pRpc->parent = pInit->parent;
344
  if (pInit->user) {
345
    tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
346
  }
347
  pRpc->timeToGetConn = pInit->timeToGetConn;
348
  if (pRpc->timeToGetConn == 0) {
349
    pRpc->timeToGetConn = 10 * 1000;
350
  }
351
  pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn;
352

353
  pRpc->tcphandle =
354
      (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
355

356
  if (pRpc->tcphandle == NULL) {
357
    // tError("failed to init rpc handle");
358
    // TAOS_CHECK_GOTO(terrno, NULL, _end);
359
  }
360
  for (int8_t i = 0; i < sizeof(labelSet) / sizeof(labelSet[0]); i++) {
361
    if (strcmp(labelSet[i].lablset, pRpc->label) == 0) {
362
      pRpc->type = labelSet[i].type;
363
      break;
364
    }
365
  }
366

367
  taosThreadMutexInit(&pRpc->sidMutx, NULL);
368
  pRpc->sidTable = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
369

370
  taosThreadMutexInit(&pRpc->seqMutex, NULL);
371
  pRpc->seqTable = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
372
  pRpc->seq = 1;
373
  transUpdateCb(pRpc->type, pRpc);
374

375
  int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
376
  void*   tmp = transAcquireExHandle(transGetInstMgt(), refId);
377
  pRpc->refId = refId;
378
  return (void*)refId;
379
_end:
380
  taosMemoryFree(pRpc);
381
  terrno = code;
382

383
  return NULL;
384
}
385
void rpcClose(void* arg) {
386
  tInfo("start to close rpc");
387
  return;
388
}
389
void rpcCloseImpl(void* arg) {
390
  if (arg == NULL) return;
391
  return;
392
  // SRpcInfo* pRpc = (SRpcInfo*)arg;
393
  // if (pRpc->tcphandle != NULL) {
394
  //   (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
395
  // }
396
  // taosMemoryFree(pRpc);
397
}
398

399
void* rpcMallocCont(int64_t contLen) {
400
  int64_t size = contLen + TRANS_MSG_OVERHEAD;
401
  char*   start = taosMemoryCalloc(1, size);
402
  if (start == NULL) {
403
    tError("failed to malloc msg, size:%" PRId64, size);
404
    terrno = TSDB_CODE_OUT_OF_MEMORY;
405
    return NULL;
406
  } else {
407
    tTrace("malloc mem:%p size:%" PRId64, start, size);
408
  }
409

410
  return start + sizeof(STransMsgHead);
411
}
412

413
void rpcFreeCont(void* cont) { transFreeMsg(cont); }
414

415
void* rpcReallocCont(void* ptr, int64_t contLen) {
416
  if (ptr == NULL) return rpcMallocCont(contLen);
417

418
  char*   st = (char*)ptr - TRANS_MSG_OVERHEAD;
419
  int64_t sz = contLen + TRANS_MSG_OVERHEAD;
420
  char*   nst = taosMemoryRealloc(st, sz);
421
  if (nst == NULL) {
422
    taosMemoryFree(st);
423
    terrno = TSDB_CODE_OUT_OF_MEMORY;
424
    return NULL;
425
  } else {
426
    st = nst;
427
  }
428

429
  return st + TRANS_MSG_OVERHEAD;
430
}
431

432
int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
433
  return transSendRequest(shandle, pEpSet, pMsg, NULL);
434
#ifdef TD_ASTRA_RPC
435
  // return transSendRequest2(shandle, pEpSet, pMsg, NULL);
436
#else
437
  return transSendRequest(shandle, pEpSet, pMsg, NULL);
438
#endif
439
}
440
int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
441
  if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
442
    return transSendRequest(shandle, pEpSet, pMsg, pCtx);
443
  } else {
444
    return transSendRequestWithId(shandle, pEpSet, pMsg, pRid);
445
  }
446
#ifdef TD_ASTRA_RPC
447
  // if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
448
  //   return transSendRequest2(shandle, pEpSet, pMsg, pCtx);
449
  // } else {
450
  //   return transSendRequestWithId2(shandle, pEpSet, pMsg, pRid);
451
  // }
452
#else
453
  if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
454
    return transSendRequest(shandle, pEpSet, pMsg, pCtx);
455
  } else {
456
    return transSendRequestWithId(shandle, pEpSet, pMsg, pRid);
457
  }
458
#endif
459
}
460

461
int32_t rpcSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
462
  return transSendRequestWithId(shandle, pEpSet, pReq, transpointId);
463
#ifdef TD_ASTRA_RPC
464
  // return transSendRequestWithId2(shandle, pEpSet, pReq, transpointId);
465
#else
466
  return transSendRequestWithId(shandle, pEpSet, pReq, transpointId);
467
#endif
468
}
469

470
int32_t rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
471
  return transSendRecv(shandle, pEpSet, pMsg, pRsp);
472
#ifdef TD_ASTRA_RPC
473
  // return transSendRecv2(shandle, pEpSet, pMsg, pRsp);
474
#else
475
  return transSendRecv(shandle, pEpSet, pMsg, pRsp);
476
#endif
477
}
478
int32_t rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
479
                               int32_t timeoutMs) {
480
  return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
481
#ifdef TD_ASTRA_RPC
482
  // return transSendRecvWithTimeout2(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
483
#else
484
  return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
485
#endif
486
}
487
int32_t rpcFreeConnById(void* shandle, int64_t connId) {
488
  return transFreeConnById(shandle, connId);
489
#ifdef TD_ASTRA_RPC
490
  // return transFreeConnById2(shandle, connId);
491
#else
492
  return transFreeConnById(shandle, connId);
493
#endif
494
}
495

496
int32_t rpcSendResponse(SRpcMsg* pMsg) {
497
  return transSendResponse(pMsg);
498
#ifdef TD_ASTRA_RPC
499
  // return transSendResponse2(pMsg);
500
#else
501
  return transSendResponse(pMsg);
502
#endif
503
}
504

505
int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) {
506
  return transRegisterMsg(msg);
507
#ifdef TD_ASTRA_RPC
508
  // return transRegisterMsg2(msg);
509
#else
510
  return transRegisterMsg(msg);
511
#endif
512
}
513
int32_t rpcReleaseHandle(void* handle, int8_t type, int32_t status) {
514
  return (*transReleaseHandle[type])(handle, status);
515
}
516

517
// client only
518
int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
519
  // later
520
  return transSetDefaultAddr(thandle, ip, fqdn);
521
#ifdef TD_ASTRA_RPC
522
  // return transSetDefaultAddr2(thandle, ip, fqdn);
523
#else
524
  return transSetDefaultAddr(thandle, ip, fqdn);
525
#endif
526
}
527
// server only
528
int32_t rpcSetIpWhite(void* thandle, void* arg) {
529
  return transSetIpWhiteList(thandle, arg, NULL);
530
#ifdef TD_ASTRA_RPC
531
  // return transSetIpWhiteList2(thandle, arg, NULL);
532
#else
533
  return transSetIpWhiteList(thandle, arg, NULL);
534
#endif
535
}
536

537
int32_t rpcAllocHandle(int64_t* refId) { return transAllocHandle(refId); }
538

539
int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); }
540
int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) {
541
  return transUtilSWhiteListToStr(pWhiteList, ppBuf);
542
}
543

544
int32_t rpcCvtErrCode(int32_t code) {
545
  if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
546
    return TSDB_CODE_RPC_NETWORK_ERROR;
547
  }
548
  return code;
549
}
550

551
int32_t rpcInit() { return transInit(); }
552

553
void rpcCleanup(void) {
554
  transCleanup();
555
  // transHttpEnvDestroy();
556

557
  return;
558
}
559

560
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc