• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.27
/source/libs/transport/src/transSvr.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 * * This program is free software: you can use, redistribute, and/or modify
4
 * it under the terms of the GNU Affero General Public License, version 3
5
 * or later ("AGPL"), as published by the Free Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful, but WITHOUT
8
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
9
 * FITNESS FOR A PARTICULAR PURPOSE.
10
 *
11
 * You should have received a copy of the GNU Affero General Public License
12
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
13
 */
14

15
#include "transComm.h"
16
#include "transLog.h"
17

18
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
19

20
#ifndef TD_ASTRA_RPC
21
static char* notify = "a";
22

23
typedef struct {
24
  int       notifyCount;  //
25
  int       init;         // init or not
26
  STransMsg msg;
27
} SSvrRegArg;
28

29
typedef struct SSvrConn {
30
  int32_t    ref;
31
  uv_tcp_t*  pTcp;
32
  uv_timer_t pTimer;
33

34
  queue       queue;
35
  SConnBuffer readBuf;  // read buf,
36
  int         inType;
37
  void*       pInst;    // rpc init
38
  void*       ahandle;  //
39
  void*       hostThrd;
40
  STransQueue resps;
41

42
  // SSvrRegArg regArg;
43
  bool broken;  // conn broken;
44

45
  ConnStatus status;
46

47
  SIpAddr  serverIp;
48
  SIpAddr  clientIp;
49
  uint16_t port;
50

51
  char src[IP_RESERVE_CAP];
52
  char dst[IP_RESERVE_CAP];
53

54
  int64_t refId;
55
  int     spi;
56
  char    info[64];
57
  char    user[TSDB_UNI_LEN];  // user ID for the link
58
  int8_t  userInited;
59
  char    secret[TSDB_PASSWORD_LEN];
60
  char    ckey[TSDB_PASSWORD_LEN];  // ciphering key
61

62
  int64_t whiteListVer;
63

64
  // state req dict
65
  SHashObj* pQTable;
66
  uv_buf_t* buf;
67
  int32_t   bufSize;
68
  queue     wq;  // uv_write_t queue
69
} SSvrConn;
70

71
typedef struct SSvrRespMsg {
72
  SSvrConn*     pConn;
73
  STransMsg     msg;
74
  queue         q;
75
  STransMsgType type;
76
  int64_t       seqNum;
77
  void*         arg;
78
  FilteFunc     func;
79
  int8_t        sent;
80

81
} SSvrRespMsg;
82

83
typedef struct {
84
  int64_t           ver;
85
  SIpWhiteListDual* pList;
86

87
} SWhiteUserList;
88
typedef struct {
89
  SHashObj* pList;
90
  int64_t   ver;
91
} SIpWhiteListTab;
92
typedef struct SWorkThrd {
93
  TdThread     thread;
94
  uv_connect_t connect_req;
95
  uv_pipe_t*   pipe;
96
  uv_os_fd_t   fd;
97
  uv_loop_t*   loop;
98
  SAsyncPool*  asyncPool;
99
  queue        msg;
100

101
  queue conn;
102
  void* pInst;
103
  bool  quit;
104

105
  SIpWhiteListTab* pWhiteList;
106
  int64_t          whiteListVer;
107
  int8_t           enableIpWhiteList;
108

109
  int32_t connRefMgt;
110

111
  int8_t inited;
112
} SWorkThrd;
113

114
typedef struct SServerObj {
115
  TdThread   thread;
116
  uv_tcp_t   server;
117
  uv_loop_t* loop;
118

119
  // work thread info
120
  int         workerIdx;
121
  int         numOfThreads;
122
  int         numOfWorkerReady;
123
  SWorkThrd** pThreadObj;
124

125
  uv_pipe_t   pipeListen;
126
  uv_pipe_t** pipe;
127
  uint32_t    ip;
128
  uint32_t    port;
129
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
130
  SIpAddr     addr;
131
  bool        inited;
132
  int8_t      ipv6;
133
} SServerObj;
134

135
SIpWhiteListTab* uvWhiteListCreate();
136
void             uvWhiteListDestroy(SIpWhiteListTab* pWhite);
137
int32_t          uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteListDual* pList, int64_t ver);
138
void             uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable);
139
bool             uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn);
140
bool             uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, SIpAddr* pIp, int64_t ver);
141
void             uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn);
142

143
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
144
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
145
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
146
static void uvOnTimeoutCb(uv_timer_t* handle);
147
static void uvOnSendCb(uv_write_t* req, int status);
148
static void uvOnPipeWriteCb(uv_write_t* req, int status);
149
static void uvOnAcceptCb(uv_stream_t* stream, int status);
150
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
151
static void uvWorkerAsyncCb(uv_async_t* handle);
152
static void uvAcceptAsyncCb(uv_async_t* handle);
153
static void uvShutDownCb(uv_shutdown_t* req, int status);
154

155
/*
156
 * time-consuming task throwed into BG work thread
157
 */
158
static void uvWorkDoTask(uv_work_t* req);
159
static void uvWorkAfterTask(uv_work_t* req, int status);
160

161
static void uvWalkCb(uv_handle_t* handle, void* arg);
162
static void uvFreeCb(uv_handle_t* handle);
163

164
static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg);
165

166
static int32_t uvPrepareSendData(SSvrRespMsg* msg, uv_buf_t* wb);
167
static void    uvStartSendResp(SSvrRespMsg* msg);
168

169
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
170

171
static FORCE_INLINE void      destroySmsg(SSvrRespMsg* smsg);
172
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
173
static FORCE_INLINE void      destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
174

175
int32_t uvGetConnRefOfThrd(SWorkThrd* thrd) { return thrd ? thrd->connRefMgt : -1; }
42,733,043!
176

177
static void uvHandleQuit(SSvrRespMsg* msg, SWorkThrd* thrd);
178
static void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd);
179
static void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd);
180
static void uvHandleRegister(SSvrRespMsg* msg, SWorkThrd* thrd);
181
static void uvHandleUpdate(SSvrRespMsg* pMsg, SWorkThrd* thrd);
182
static void (*transAsyncHandle[])(SSvrRespMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
183
                                                                        uvHandleRegister, uvHandleUpdate};
184

185
static void uvDestroyConn(uv_handle_t* handle);
186

187
// server and worker thread
188
static void* transWorkerThread(void* arg);
189
static void* transAcceptThread(void* arg);
190

191
static void destroyWorkThrd(SWorkThrd* pThrd);
192
static void destroyWorkThrdObj(SWorkThrd* pThrd);
193

194
static void sendQuitToWorkThrd(SWorkThrd* pThrd);
195

196
// add handle loop
197
static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
198
static int32_t addHandleToAcceptloop(void* arg);
199

200
#define SRV_RELEASE_UV(loop)                    \
201
  do {                                          \
202
    TAOS_UNUSED(uv_walk(loop, uvWalkCb, NULL)); \
203
    TAOS_UNUSED(uv_run(loop, UV_RUN_DEFAULT));  \
204
    TAOS_UNUSED(uv_loop_close(loop));           \
205
  } while (0);
206

207
#define ASYNC_ERR_JRET(thrd)                            \
208
  do {                                                  \
209
    if (thrd->quit) {                                   \
210
      tTrace("worker thread already quit, ignore msg"); \
211
      goto _return1;                                    \
212
    }                                                   \
213
  } while (0)
214

215
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
10,560,196✔
216
  SSvrConn*    conn = handle->data;
10,560,196✔
217
  SConnBuffer* pBuf = &conn->readBuf;
10,560,196✔
218
  int32_t      code = transAllocBuffer(pBuf, buf);
10,560,196✔
219
  if (code < 0) {
10,565,925!
220
    tError("conn:%p, failed to alloc buffer, since %s", conn, tstrerror(code));
×
221
  }
222
}
10,565,925✔
223

224
// refers specifically to query or insert timeout
225
static void uvHandleActivityTimeout(uv_timer_t* handle) {
×
226
  SSvrConn* conn = handle->data;
×
227
  tDebug("%p timeout since no activity", conn);
×
228
}
×
229

230
static bool uvCheckIp(SIpRange* pRange, SIpAddr* ip) {
×
231
  // impl later
232
  SIpRange ipUint = {0};
×
233
  if (tIpStrToUint(ip, &ipUint) != 0) {
×
234
    return false;
×
235
  }
236
  if (pRange->type == 0) {
×
237
    SubnetUtils subnet = {0};
×
238
    SIpV4Range  range4 = pRange->ipV4;
×
239
    if (subnetInit(&subnet, &range4) != 0) {
×
240
      return false;
×
241
    }
242
    return subnetCheckIp(&subnet, ipUint.ipV4.ip);
×
243
  } else if (pRange->type == 1) {
×
244
    return transUtilCheckDualIp(pRange, &ipUint);
×
245
  }
246

247
  return false;
×
248
}
249
SIpWhiteListTab* uvWhiteListCreate() {
47,196✔
250
  SIpWhiteListTab* pWhiteList = taosMemoryCalloc(1, sizeof(SIpWhiteListTab));
47,196!
251
  if (pWhiteList == NULL) {
47,196!
252
    return NULL;
×
253
  }
254

255
  pWhiteList->pList = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
47,196✔
256
  if (pWhiteList->pList == NULL) {
47,196!
257
    taosMemoryFree(pWhiteList);
×
258
    return NULL;
×
259
  }
260

261
  pWhiteList->ver = -1;
47,196✔
262
  return pWhiteList;
47,196✔
263
}
264
void uvWhiteListDestroy(SIpWhiteListTab* pWhite) {
47,196✔
265
  SHashObj* pWhiteList = pWhite->pList;
47,196✔
266
  void*     pIter = taosHashIterate(pWhiteList, NULL);
47,196✔
267
  while (pIter) {
47,228✔
268
    SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
32✔
269
    taosMemoryFree(pUserList->pList);
32!
270
    taosMemoryFree(pUserList);
32!
271

272
    pIter = taosHashIterate(pWhiteList, pIter);
32✔
273
  }
274
  taosHashCleanup(pWhiteList);
47,196✔
275
  taosMemoryFree(pWhite);
47,196!
276
}
47,196✔
277

278
int32_t uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) {
32✔
279
  char*   tmp = NULL;
32✔
280
  int32_t tlen = transUtilSWhiteListToStr(plist->pList, &tmp);
32✔
281
  if (tlen < 0) {
32!
282
    return tlen;
×
283
  }
284

285
  char* pBuf = taosMemoryCalloc(1, tlen + 64);
32!
286
  if (pBuf == NULL) {
32!
287
    return terrno;
×
288
  }
289

290
  int32_t len = sprintf(pBuf, "user:%s, ver:%" PRId64 ", ip:{%s}", user, plist->ver, tmp);
32✔
291
  taosMemoryFree(tmp);
32!
292

293
  *ppBuf = pBuf;
32✔
294
  return len;
32✔
295
}
296
void uvWhiteListDebug(SIpWhiteListTab* pWrite) {
32✔
297
  int32_t   len = 0;
32✔
298
  SHashObj* pWhiteList = pWrite->pList;
32✔
299
  void*     pIter = taosHashIterate(pWhiteList, NULL);
32✔
300
  while (pIter) {
61✔
301
    size_t klen = 0;
31✔
302
    char   user[TSDB_USER_LEN + 1] = {0};
31✔
303
    char*  pUser = taosHashGetKey(pIter, &klen);
31✔
304
    memcpy(user, pUser, klen);
32✔
305

306
    SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
32✔
307

308
    char* buf = NULL;
32✔
309
    len = uvWhiteListToStr(pUserList, user, &buf);
32✔
310
    if (len > 0) {
32!
311
      tTrace("ip-white-list debug str %s ", buf);
32!
312
    }
313
    taosMemoryFree(buf);
32!
314
    pIter = taosHashIterate(pWhiteList, pIter);
32✔
315
  }
316
}
30✔
317
int32_t uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteListDual* plist, int64_t ver) {
31✔
318
  int32_t   code = 0;
31✔
319
  SHashObj* pWhiteList = pWhite->pList;
31✔
320

321
  SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user));
31✔
322
  if (ppUserList == NULL || *ppUserList == NULL) {
64!
323
    SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
32!
324
    if (pUserList == NULL) {
30!
325
      return terrno;
×
326
    }
327

328
    pUserList->ver = ver;
30✔
329

330
    pUserList->pList = plist;
30✔
331

332
    code = taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*));
30✔
333
    if (code != 0) {
32!
334
      taosMemoryFree(pUserList);
×
335
      return code;
×
336
    }
337
  } else {
338
    SWhiteUserList* pUserList = *ppUserList;
×
339

340
    taosMemoryFreeClear(pUserList->pList);
×
341
    pUserList->ver = ver;
×
342
    pUserList->pList = plist;
×
343
  }
344
  uvWhiteListDebug(pWhite);
32✔
345
  return 0;
30✔
346
}
347

348
static bool uvWhiteListIsDefaultAddr(SIpAddr* ip) {
×
349
  int32_t  code = 0;
×
350
  int32_t  lino = 0;
×
351
  SIpRange addr4 = {0};
×
352
  SIpRange addr6 = {0};
×
353

354
  SIpRange target = {0};
×
355
  code = tIpStrToUint(ip, &target);
×
356
  TAOS_CHECK_GOTO(code, &lino, _error);
×
357

358
  if (target.type == 0) {
×
359
    code = createDefaultIp4Range(&addr4);
×
360
    TAOS_CHECK_GOTO(code, &lino, _error);
×
361

362
    if (target.ipV4.ip == addr4.ipV4.ip) {
×
363
      return true;
×
364
    }
365

366
  } else {
367
    code = createDefaultIp6Range(&addr6);
×
368
    TAOS_CHECK_GOTO(code, &lino, _error);
×
369

370
    if (addr6.ipV6.addr[0] == target.ipV6.addr[0] && addr6.ipV6.addr[1] == target.ipV6.addr[1]) {
×
371
      return true;
×
372
    }
373
  }
374

375
_error:
×
376
  if (code != 0) {
×
377
    tError("failed to create default ip range since %s", tstrerror(code));
×
378
  }
379
  return false;
×
380
}
381

382
bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, SIpAddr* pIp, int64_t ver) {
×
383
  // impl check
384
  SHashObj* pWhiteList = pWhite->pList;
×
385
  bool      valid = false;
×
386

387
  if (uvWhiteListIsDefaultAddr(pIp)) return true;
×
388

389
  SWhiteUserList** ppList = taosHashGet(pWhiteList, user, strlen(user));
×
390
  if (ppList == NULL || *ppList == NULL) {
×
391
    return false;
×
392
  }
393
  SWhiteUserList* pUserList = *ppList;
×
394
  if (pUserList->ver == ver) return true;
×
395

396
  SIpWhiteListDual* pIpWhiteList = pUserList->pList;
×
397
  for (int i = 0; i < pIpWhiteList->num; i++) {
×
398
    SIpRange* pRange = &pIpWhiteList->pIpRanges[i];
×
399
    if (uvCheckIp(pRange, pIp)) {
×
400
      valid = true;
×
401
      break;
×
402
    }
403
  }
404
  return valid;
×
405
}
406
bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn) {
1,515✔
407
  if (pConn->inType == TDMT_MND_STATUS || pConn->inType == TDMT_MND_RETRIEVE_IP_WHITE ||
1,515!
408
      pConn->inType == TDMT_MND_RETRIEVE_IP_WHITE_DUAL || taosIpAddrIsEqual(&pConn->clientIp, &pConn->serverIp) ||
1,426!
409
      pWhite->ver == pConn->whiteListVer /*|| strncmp(pConn->user, "_dnd", strlen("_dnd")) == 0*/)
×
410
    return true;
1,515✔
411

412
  return uvWhiteListFilte(pWhite, pConn->user, &pConn->clientIp, pConn->whiteListVer);
×
413
}
414
void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn) {
1,515✔
415
  // if conn already check by current whiteLis
416
  pConn->whiteListVer = pWhite->ver;
1,515✔
417
}
1,515✔
418

419
static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* pTransMsg) {
9,189,595✔
420
  // if (!(rpcDebugFlag & DEBUG_DEBUG)) {
421
  //   return;
422
  // }
423

424
  STrans*   pInst = pConn->pInst;
9,189,595✔
425
  STraceId* trace = &pHead->traceId;
9,189,595✔
426

427
  int64_t        cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp);
9,188,579✔
428
  static int64_t EXCEPTION_LIMIT_US = 1000 * 1000;
429

430
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
9,193,026!
431
    if (cost >= EXCEPTION_LIMIT_US) {
8,377,899✔
432
      tGWarn("%s conn:%p, %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%" PRId64
16,211!
433
             ", sid:%" PRId64,
434
             transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
435
             (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
436
    } else {
437
      tGDebug("%s conn:%p, %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%" PRId64 ", sid:%" PRId64,
8,361,688!
438
              transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
439
              (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
440
    }
441
  } else {
442
    if (cost >= EXCEPTION_LIMIT_US) {
815,127✔
443
      tGWarn(
3!
444
          "%s conn:%p, %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, "
445
          "seqNum:%" PRId64 ", sid:%" PRId64,
446
          transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
447
          pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
448
    } else {
449
      tGDebug("%s conn:%p, %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%" PRId64
815,124!
450
              ", "
451
              "sid:%" PRId64,
452
              transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
453
              pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
454
    }
455
  }
456
  tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pInst), pTransMsg->info.handle, pConn,
9,201,303!
457
          pConn->refId);
458
}
9,201,303✔
459

460
static int8_t uvValidConn(SSvrConn* pConn) {
×
461
  STrans*    pInst = pConn->pInst;
×
462
  SWorkThrd* pThrd = pConn->hostThrd;
×
463
  int8_t     forbiddenIp = 0;
×
464
  if (pThrd->enableIpWhiteList) {
×
465
    forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0;
×
466
    if (forbiddenIp == 0) {
×
467
      uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
×
468
    }
469
  }
470
  return forbiddenIp;
×
471
}
472

473
static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
10,230,279✔
474
  int32_t code = 0;
10,230,279✔
475
  STrans* pInst = pConn->pInst;
10,230,279✔
476
  if (pHead->msgType == TDMT_SCH_TASK_RELEASE) {
10,230,279✔
477
    int64_t qId = taosHton64(pHead->qid);
1,043,829✔
478
    if (qId <= 0) {
1,044,062!
479
      tError("conn:%p, recv release, but invalid sid:%" PRId64, pConn, qId);
×
480
      code = TSDB_CODE_RPC_NO_STATE;
×
481
    } else {
482
      void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId));
1,044,062✔
483
      if (p == NULL) {
1,044,372✔
484
        code = TSDB_CODE_RPC_NO_STATE;
1,044,317✔
485
        tTrace("conn:%p, recv release, and releady release by server sid:%" PRId64, pConn, qId);
1,044,317!
486
      } else {
487
        SSvrRegArg* arg = p;
55✔
488
        (pInst->cfp)(pInst->parent, &(arg->msg), NULL);
55✔
489
        tTrace("conn:%p, recv release, notify server app, sid:%" PRId64, pConn, qId);
71!
490

491
        code = taosHashRemove(pConn->pQTable, &qId, sizeof(qId));
71✔
492
        if (code != 0) {
71!
493
          tDebug("conn:%p, failed to remove sid:%" PRId64, pConn, qId);
×
494
        }
495
        tTrace("conn:%p, clear state,sid:%" PRId64, pConn, qId);
71!
496
      }
497
    }
498

499
    STransMsg tmsg = {.code = code,
2,088,449✔
500
                      .msgType = pHead->msgType + 1,
1,044,388✔
501
                      .info.qId = qId,
502
                      .info.traceId = pHead->traceId,
503
                      .info.seqNum = taosHton64(pHead->seqNum)};
1,044,388✔
504

505
    SSvrRespMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
1,044,061!
506
    if (srvMsg == NULL) {
1,043,956!
507
      tError("conn:%p, recv release, failed to send release-resp since %s", pConn, tstrerror(terrno));
×
508
      taosMemoryFree(pHead);
×
509
      return terrno;
×
510
    }
511
    srvMsg->msg = tmsg;
1,043,956✔
512
    srvMsg->type = Normal;
1,043,956✔
513
    srvMsg->pConn = pConn;
1,043,956✔
514

515
    transQueuePush(&pConn->resps, &srvMsg->q);
1,043,956✔
516

517
    uvStartSendRespImpl(srvMsg);
518
    taosMemoryFree(pHead);
1,044,349!
519
    return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
1,044,474✔
520
  }
521
  return 0;
9,186,450✔
522
}
523

524
bool uvConnMayGetUserInfo(SSvrConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) {
10,230,699✔
525
  if (pConn->userInited) {
10,230,699✔
526
    return false;
10,117,801✔
527
  }
528

529
  STrans*        pInst = pConn->pInst;
112,898✔
530
  STransMsgHead* pHead = *ppHead;
112,898✔
531
  int32_t        len = *msgLen;
112,898✔
532
  if (pHead->withUserInfo) {
112,898!
533
    STransMsgHead* tHead = taosMemoryCalloc(1, len - sizeof(pInst->user));
116,912!
534
    if (tHead == NULL) {
116,911!
535
      tError("conn:%p, failed to get user info since %s", pConn, tstrerror(terrno));
×
536
      return false;
×
537
    }
538
    memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
116,911✔
539
    memcpy((char*)tHead + TRANS_MSG_OVERHEAD, (char*)pHead + TRANS_MSG_OVERHEAD + sizeof(pInst->user),
116,911✔
540
           len - sizeof(STransMsgHead) - sizeof(pInst->user));
541
    tHead->msgLen = htonl(htonl(pHead->msgLen) - sizeof(pInst->user));
116,911✔
542

543
    memcpy(pConn->user, (char*)pHead + TRANS_MSG_OVERHEAD, sizeof(pConn->user));
116,911✔
544
    pConn->userInited = 1;
116,911✔
545

546
    taosMemoryFree(pHead);
116,911!
547
    *ppHead = tHead;
116,911✔
548
    *msgLen = len - sizeof(pInst->user);
116,911✔
549
    return true;
116,911✔
550
  }
551
  return false;
×
552
}
553
static bool uvHandleReq(SSvrConn* pConn) {
10,229,484✔
554
  STrans*    pInst = pConn->pInst;
10,229,484✔
555
  SWorkThrd* pThrd = pConn->hostThrd;
10,229,484✔
556

557
  int8_t         acquire = 0;
10,229,484✔
558
  STransMsgHead* pHead = NULL;
10,229,484✔
559

560
  int8_t resetBuf = 0;
10,229,484✔
561
  int    msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, 0);
10,229,484✔
562
  if (msgLen <= 0) {
10,236,306!
563
    tError("%s conn:%p, read invalid packet", transLabel(pInst), pConn);
×
564
    return false;
×
565
  }
566
  if (transDecompressMsg((char**)&pHead, &msgLen) < 0) {
10,236,306!
567
    tError("%s conn:%p, recv invalid packet, failed to decompress", transLabel(pInst), pConn);
×
568
    taosMemoryFree(pHead);
×
569
    return false;
×
570
  }
571

572
  if (uvConnMayGetUserInfo(pConn, &pHead, &msgLen) == true) {
10,236,775✔
573
    tDebug("%s conn:%p, get user info", transLabel(pInst), pConn);
118,481✔
574
  } else {
575
    if (pConn->userInited == 0) {
10,116,597!
576
      taosMemoryFree(pHead);
×
577
      tDebug("%s conn:%p, failed get user info since %s", transLabel(pInst), pConn, tstrerror(terrno));
×
578
      return false;
×
579
    }
580
    tTrace("%s conn:%p, no need get user info", transLabel(pInst), pConn);
10,116,597✔
581
  }
582

583
  if (resetBuf == 0) {
10,235,097✔
584
    tTrace("%s conn:%p, not reset read buf", transLabel(pInst), pConn);
10,234,565✔
585
  }
586

587
  pHead->code = htonl(pHead->code);
10,235,097✔
588
  pHead->msgLen = htonl(pHead->msgLen);
10,235,097✔
589

590
  pConn->inType = pHead->msgType;
10,235,097✔
591

592
  int8_t forbiddenIp = 0;
10,235,097✔
593
  if (pThrd->enableIpWhiteList && tsEnableWhiteList) {
10,235,097!
594
    forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0;
1,515✔
595
    if (forbiddenIp == 0) {
1,515!
596
      uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
1,515✔
597
    }
598
  }
599

600
  if (uvMayHandleReleaseReq(pConn, pHead)) {
10,235,097✔
601
    return true;
1,044,413✔
602
  }
603

604
  STransMsg transMsg = {0};
9,187,907✔
605
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
9,187,907✔
606
  transMsg.pCont = pHead->content;
9,187,907✔
607
  transMsg.msgType = pHead->msgType;
9,187,907✔
608
  transMsg.code = pHead->code;
9,187,907✔
609

610
  if (pHead->seqNum == 0) {
9,187,907!
611
    STraceId* trace = &pHead->traceId;
×
612
    tGError("%s conn:%p, received invalid seqNum, msgType:%s", transLabel(pInst), pConn, TMSG_INFO(pHead->msgType));
×
613
    return false;
×
614
  }
615

616
  transMsg.info.handle = (void*)transAcquireExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId);
9,187,907✔
617
  transMsg.info.refIdMgt = pThrd->connRefMgt;
9,194,450✔
618

619
  // pHead->noResp = 1,
620
  // 1. server application should not send resp on handle
621
  // 2. once send out data, cli conn released to conn pool immediately
622
  // 3. not mixed with persist
623
  transMsg.info.refId = pHead->noResp == 1 ? -1 : pConn->refId;
9,194,450✔
624
  transMsg.info.traceId = pHead->traceId;
9,194,450✔
625
  transMsg.info.cliVer = htonl(pHead->compatibilityVer);
9,194,450✔
626
  transMsg.info.forbiddenIp = forbiddenIp;
9,194,450✔
627
  transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0;
9,194,450✔
628
  transMsg.info.seqNum = taosHton64(pHead->seqNum);
9,194,450✔
629
  transMsg.info.qId = taosHton64(pHead->qid);
9,195,008✔
630
  transMsg.info.msgType = pHead->msgType;
9,195,115✔
631

632
  uvPerfLog_receive(pConn, pHead, &transMsg);
9,195,115✔
633

634
  // set up conn info
635
  SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
9,194,581✔
636
  pConnInfo->cliAddr = pConn->clientIp;
9,194,581✔
637
  // pConnInfo->clientPort = pConn->port;
638
  tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
9,194,581✔
639

640
  transReleaseExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId);
9,194,581✔
641

642
  (*pInst->cfp)(pInst->parent, &transMsg, NULL);
9,195,077✔
643
  return true;
9,195,367✔
644
}
645

646
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
10,557,804✔
647
  int32_t    code = 0;
10,557,804✔
648
  SSvrConn*  conn = cli->data;
10,557,804✔
649
  STrans*    pInst = conn->pInst;
10,557,804✔
650
  SWorkThrd* pThrd = conn->hostThrd;
10,557,804✔
651

652
  STUB_RAND_NETWORK_ERR(nread);
653

654
  if (true == pThrd->quit) {
10,557,804!
655
    tInfo("work thread received quit msg, destroy conn");
×
656
    destroyConn(conn, true);
657
    return;
×
658
  }
659

660
  code = transSetReadOption((uv_handle_t*)cli);
10,557,804✔
661
  if (code != 0) {
10,562,182!
662
    tWarn("%s conn:%p, failed to set recv opt since %s", transLabel(pInst), conn, tstrerror(code));
×
663
  }
664

665
  SConnBuffer* pBuf = &conn->readBuf;
10,559,189✔
666
  if (nread > 0) {
10,559,189✔
667
    pBuf->len += nread;
10,347,667✔
668
    if (pBuf->len <= TRANS_PACKET_LIMIT) {
10,347,667!
669
      while (transReadComplete(pBuf)) {
20,590,076✔
670
        if (true == pBuf->invalid || false == uvHandleReq(conn)) {
10,236,993!
671
          tError("%s conn:%p, read invalid packet, received from %s, local info:%s", transLabel(pInst), conn, conn->dst,
×
672
                 conn->src);
673
          conn->broken = true;
×
674
          transUnrefSrvHandle(conn);
×
675
          return;
×
676
        }
677
      }
678
      return;
10,354,697✔
679
    } else {
680
      tError("%s conn:%p, read invalid packet, exceed limit, received from %s, local info:%s", transLabel(pInst), conn,
×
681
             conn->dst, conn->src);
682
      transUnrefSrvHandle(conn);
×
683
      return;
×
684
    }
685
  }
686
  if (nread == 0) {
211,522✔
687
    return;
123,184✔
688
  }
689

690
  tDebug("%s conn:%p, read error:%s", transLabel(pInst), conn, uv_err_name(nread));
88,338✔
691
  if (nread < 0) {
88,501!
692
    conn->broken = true;
88,501✔
693
    transUnrefSrvHandle(conn);
88,501✔
694
  }
695
}
696
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
162,705✔
697
  buf->len = 2;
162,705✔
698
  buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
162,705!
699
  if (buf->base == NULL) {
163,832!
700
    tError("failed to alloc conn read buffer since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
701
  }
702
}
163,832✔
703

704
void uvOnTimeoutCb(uv_timer_t* handle) {
×
705
  // opt
706
  SSvrConn* pConn = handle->data;
×
707
  tError("conn:%p, time out", pConn);
×
708
}
×
709

710
void uvOnSendCb(uv_write_t* req, int status) {
10,438,079✔
711
  STUB_RAND_NETWORK_ERR(status);
712

713
  SWReqsWrapper* wrapper = req->data;
10,438,079✔
714
  SSvrConn*      conn = wrapper->arg;
10,438,079✔
715

716
  queue src;
717
  QUEUE_INIT(&src);
10,438,079✔
718
  QUEUE_MOVE(&wrapper->node, &src);
10,438,079!
719

720
  freeWReqToWQ(&conn->wq, wrapper);
10,438,079✔
721

722
  tTrace("%s conn:%p, send data out", transLabel(conn->pInst), conn);
10,437,709✔
723

724
  if (status == 0) {
10,437,842!
725
    while (!QUEUE_IS_EMPTY(&src)) {
20,877,804✔
726
      queue* head = QUEUE_HEAD(&src);
10,438,045✔
727
      QUEUE_REMOVE(head);
10,438,045✔
728

729
      SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
10,438,045✔
730
      STraceId*    trace = &smsg->msg.info.traceId;
10,438,045✔
731
      tGDebug("%s conn:%p, msg already send out, seqNum:%" PRId64 ", sid:%" PRId64, transLabel(conn->pInst), conn,
10,438,045!
732
              smsg->msg.info.seqNum, smsg->msg.info.qId);
733
      destroySmsg(smsg);
734
    }
735
  } else {
736
    while (!QUEUE_IS_EMPTY(&src)) {
×
737
      queue* head = QUEUE_HEAD(&src);
×
738
      QUEUE_REMOVE(head);
×
739

740
      SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
×
741
      STraceId*    trace = &smsg->msg.info.traceId;
×
742
      tGDebug("%s conn:%p, failed to send, seqNum:%" PRId64 ", sid:%" PRId64 " since %s", transLabel(conn->pInst), conn,
×
743
              smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status));
744
      destroySmsg(smsg);
745
    }
746

747
    conn->broken = true;
×
748
    transUnrefSrvHandle(conn);
×
749
  }
750
  transUnrefSrvHandle(conn);
10,439,759✔
751
}
10,439,650✔
752
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
116,934✔
753
  STUB_RAND_NETWORK_ERR(status);
754
  if (status == 0) {
116,934!
755
    tTrace("success to dispatch conn to work thread");
116,934✔
756
  } else {
757
    tError("fail to dispatch conn to work thread since %s", uv_strerror(status));
×
758
  }
759
  if (!uv_is_closing((uv_handle_t*)req->data)) {
116,934!
760
    uv_close((uv_handle_t*)req->data, uvFreeCb);
116,934✔
761
  } else {
762
    taosMemoryFree(req->data);
×
763
  }
764
  taosMemoryFree(req);
116,934!
765
}
116,934✔
766

767
static int32_t uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
10,434,766✔
768
  SSvrConn*  pConn = smsg->pConn;
10,434,766✔
769
  STransMsg* pMsg = &smsg->msg;
10,434,766✔
770
  if (pMsg->pCont == 0) {
10,434,766✔
771
    pMsg->pCont = (void*)rpcMallocCont(0);
3,857,127✔
772
    if (pMsg->pCont == NULL) {
3,857,410!
773
      return terrno;
×
774
    }
775
    pMsg->contLen = 0;
3,857,720✔
776
  }
777
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
10,435,359✔
778
  pHead->traceId = pMsg->info.traceId;
10,435,359✔
779
  pHead->hasEpSet = pMsg->info.hasEpSet;
10,435,359✔
780
  pHead->magicNum = htonl(TRANS_MAGIC_NUM);
10,435,359✔
781
  pHead->compatibilityVer = htonl(((STrans*)pConn->pInst)->compatibilityVer);
10,435,359✔
782
  pHead->version = TRANS_VER;
10,435,359✔
783
  pHead->seqNum = taosHton64(pMsg->info.seqNum);
10,435,359✔
784
  pHead->qid = taosHton64(pMsg->info.qId);
10,437,147✔
785
  pHead->withUserInfo = pConn->userInited == 0 ? 1 : 0;
10,437,798✔
786

787
  // handle invalid drop_task resp, TD-20098
788
  // if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
789
  //   //  return TSDB_CODE_INVALID_MSG;
790
  //   return 0;
791
  // }
792

793
  pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
10,437,798!
794
  // pHead->msgType = pMsg->msgType;
795
  // pHead->release = smsg->type == Release ? 1 : 0;
796
  pHead->code = htonl(pMsg->code);
10,437,798✔
797
  pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead));
10,437,798✔
798

799
  char*   msg = (char*)pHead;
10,437,798✔
800
  int32_t len = transMsgLenFromCont(pMsg->contLen);
10,437,798✔
801

802
  STrans* pInst = pConn->pInst;
10,437,798✔
803
  if (pMsg->info.compressed == 0 && !taosIpAddrIsEqual(&pConn->clientIp, &pConn->serverIp) &&
10,437,798!
804
      pInst->compressSize != -1 && pInst->compressSize < pMsg->contLen) {
×
805
    len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
×
806
    pHead->msgLen = (int32_t)htonl((uint32_t)len);
×
807
  }
808

809
  STraceId* trace = &pMsg->info.traceId;
10,437,123✔
810
  tGDebug("%s conn:%p, %s is sent to %s, local info:%s, len:%d, seqNum:%" PRId64 ", sid:%" PRId64, transLabel(pInst),
10,437,123!
811
          pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len, pMsg->info.seqNum, pMsg->info.qId);
812

813
  wb->base = (char*)pHead;
10,439,719✔
814
  wb->len = len;
10,439,719✔
815
  return 0;
10,439,719✔
816
}
817

818
static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* toSendQ) {
10,434,945✔
819
  int32_t code = 0;
10,434,945✔
820
  int32_t size = transQueueSize(&pConn->resps);
10,434,945✔
821
  tTrace("%s conn:%p, has %d msg to send", transLabel(pConn->pInst), pConn, size);
10,436,202✔
822
  if (size == 0) {
10,435,406!
823
    return 0;
×
824
  }
825

826
  if (pConn->bufSize < size) {
10,435,406!
827
    pConn->buf = taosMemoryRealloc(pConn->buf, size * sizeof(uv_buf_t));
×
828
    if (pConn->buf == NULL) {
×
829
      return terrno;
×
830
    }
831
    pConn->bufSize = size;
×
832
  }
833
  uv_buf_t* pWb = pConn->buf;
10,435,406✔
834

835
  int32_t count = 0;
10,435,406✔
836

837
  while (transQueueSize(&pConn->resps) > 0) {
20,874,484✔
838
    queue*       el = transQueuePop(&pConn->resps);
10,433,380✔
839
    SSvrRespMsg* pMsg = QUEUE_DATA(el, SSvrRespMsg, q);
10,436,598✔
840
    uv_buf_t     wb;
841
    code = uvPrepareSendData(pMsg, &wb);
10,436,598✔
842
    if (code != 0) {
10,439,078!
843
      return code;
×
844
    }
845
    pWb[count] = wb;
10,439,078✔
846
    pMsg->sent = 1;
10,439,078✔
847
    QUEUE_PUSH(toSendQ, &pMsg->q);
10,439,078✔
848
    count++;
10,439,078✔
849
  }
850

851
  if (count == 0) {
10,439,118!
852
    return 0;
×
853
  }
854

855
  *bufNum = count;
10,439,118✔
856
  *ppBuf = pWb;
10,439,118✔
857

858
  return 0;
10,439,118✔
859
}
860

861
static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
862
  int32_t   code = 0;
10,437,455✔
863
  SSvrConn* pConn = smsg->pConn;
10,437,455✔
864
  if (pConn->broken) {
10,437,455!
865
    return;
×
866
  }
867
  int32_t size = transQueueSize(&pConn->resps);
10,437,455✔
868
  if (size == 0) {
10,437,145!
869
    tTrace("%s conn:%p, has %d msg to send", transLabel(pConn->pInst), pConn, size);
×
870
    return;
×
871
  }
872

873
  uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
10,437,145✔
874
  if (req == NULL) {
10,434,976!
875
    uError("%s conn:%p, failed to alloc write req since %s", transLabel(pConn->pInst), pConn, tstrerror(terrno));
×
876
    transUnrefSrvHandle(pConn);
×
877
    return;
×
878
  }
879
  SWReqsWrapper* pWreq = req->data;
10,434,976✔
880

881
  uv_buf_t* pBuf = NULL;
10,434,976✔
882
  int32_t   bufNum = 0;
10,434,976✔
883
  code = uvBuildToSendData(pConn, &pBuf, &bufNum, &pWreq->node);
10,434,976✔
884
  if (code != 0) {
10,439,124!
885
    tError("%s conn:%p, failed to send data", transLabel(pConn->pInst), pConn);
×
886
    return;
×
887
  }
888
  if (bufNum == 0) {
10,439,132!
889
    tDebug("%s conn:%p, no data to send", transLabel(pConn->pInst), pConn);
×
890
    return;
×
891
  }
892

893
  transRefSrvHandle(pConn);
10,439,132✔
894

895
  int32_t ret = uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb);
10,439,132✔
896
  if (ret != 0) {
10,438,287!
897
    tError("conn:%p, failed to write data since %s", pConn, uv_err_name(ret));
×
898
    pConn->broken = true;
×
899
    while (!QUEUE_IS_EMPTY(&pWreq->node)) {
×
900
      queue* head = QUEUE_HEAD(&pWreq->node);
×
901
      QUEUE_REMOVE(head);
×
902
      SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
×
903
      destroySmsg(smsg);
904
    }
905
    freeWReqToWQ(&pConn->wq, req->data);
×
906

907
    transUnrefSrvHandle(pConn);
×
908
  }
909
}
910
int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) {
9,392,670✔
911
  int32_t   code = 0;
9,392,670✔
912
  SSvrConn* pConn = pMsg->pConn;
9,392,670✔
913
  int64_t   qid = pMsg->msg.info.qId;
9,392,670✔
914
  if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) {
9,392,670!
915
    SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
2,543,378✔
916
    if (p == NULL) {
2,543,399✔
917
      tError("%s conn:%p, already release sid:%" PRId64, transLabel(pConn->pInst), pConn, qid);
73!
918
      return TSDB_CODE_RPC_NO_STATE;
73✔
919
    } else {
920
      transFreeMsg(p->msg.pCont);
2,543,326✔
921
      code = taosHashRemove(pConn->pQTable, &qid, sizeof(qid));
2,543,125✔
922
      if (code != 0) {
2,543,383!
923
        tError("%s conn:%p, failed to release sid:%" PRId64 " since %s", transLabel(pConn->pInst), pConn, qid,
×
924
               tstrerror(code));
925
      }
926
    }
927
  }
928
  return 0;
9,393,063✔
929
}
930
static void uvStartSendResp(SSvrRespMsg* smsg) {
9,392,816✔
931
  // impl
932
  SSvrConn* pConn = smsg->pConn;
9,392,816✔
933
  if (uvMayHandleReleaseResp(smsg) == TSDB_CODE_RPC_NO_STATE) {
9,392,816✔
934
    destroySmsg(smsg);
935
    return;
73✔
936
  }
937

938
  transQueuePush(&pConn->resps, &smsg->q);
9,393,258✔
939
  uvStartSendRespImpl(smsg);
940
  return;
9,393,698✔
941
}
942

943
static FORCE_INLINE void destroySmsg(SSvrRespMsg* smsg) {
944
  if (smsg == NULL) {
10,439,383!
945
    return;
×
946
  }
947
  transFreeMsg(smsg->msg.pCont);
10,439,383✔
948
  taosMemoryFree(smsg);
10,439,916!
949
}
950
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrRespMsg*)smsg); }
×
951

952
static void destroyAllConn(SWorkThrd* pThrd) {
15,485✔
953
  tTrace("thread %p destroy all conn ", pThrd);
15,485✔
954
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
43,912✔
955
    queue* h = QUEUE_HEAD(&pThrd->conn);
28,427✔
956
    QUEUE_REMOVE(h);
28,427✔
957
    QUEUE_INIT(h);
28,427✔
958

959
    SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue);
28,427✔
960
    while (c->ref >= 2) {
28,428✔
961
      transUnrefSrvHandle(c);
1✔
962
    }
963
    transUnrefSrvHandle(c);
28,427✔
964
  }
965
}
15,485✔
966
void uvWorkerAsyncCb(uv_async_t* handle) {
12,178,289✔
967
  SAsyncItem* item = handle->data;
12,178,289✔
968
  SWorkThrd*  pThrd = item->pThrd;
12,178,289✔
969
  SSvrConn*   conn = NULL;
12,178,289✔
970
  queue       wq;
971

972
  // batch process to avoid to lock/unlock frequently
973
  if (taosThreadMutexLock(&item->mtx) != 0) {
12,178,289!
974
    tError("failed to lock mutex");
×
975
  }
976

977
  QUEUE_MOVE(&item->qmsg, &wq);
12,183,263!
978

979
  if (taosThreadMutexUnlock(&item->mtx) != 0) {
12,183,263!
980
    tError("failed to unlock mutex");
×
981
  }
982

983
  while (!QUEUE_IS_EMPTY(&wq)) {
24,361,837✔
984
    queue* head = QUEUE_HEAD(&wq);
12,177,951✔
985
    QUEUE_REMOVE(head);
12,177,951✔
986

987
    SSvrRespMsg* msg = QUEUE_DATA(head, SSvrRespMsg, q);
12,177,951✔
988
    if (msg == NULL) {
12,177,951!
989
      tError("unexcept occurred, continue");
×
990
      continue;
×
991
    }
992

993
    // release handle to rpc init
994
    if (msg->type == Quit || msg->type == Update) {
12,177,951✔
995
      (*transAsyncHandle[msg->type])(msg, pThrd);
231,182✔
996
    } else {
997
      STransMsg transMsg = msg->msg;
11,946,769✔
998

999
      SExHandle* exh1 = transMsg.info.handle;
11,946,769✔
1000
      int64_t    refId = transMsg.info.refId;
11,946,769✔
1001
      msg->seqNum = transMsg.info.seqNum;
11,946,769✔
1002

1003
      SExHandle* exh2 = transAcquireExHandle(uvGetConnRefOfThrd(pThrd), refId);
11,946,769✔
1004
      if (exh2 == NULL || exh1 != exh2) {
11,947,307!
1005
        tTrace("handle except msg %p, ignore it", exh1);
75!
1006
        transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId);
75✔
1007
        destroySmsg(msg);
1008
        continue;
×
1009
      }
1010
      msg->pConn = exh1->handle;
11,947,232✔
1011
      transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId);
11,947,232✔
1012
      (*transAsyncHandle[msg->type])(msg, pThrd);
11,946,663✔
1013
    }
1014
  }
1015
}
12,183,886✔
1016
static void uvWalkCb(uv_handle_t* handle, void* arg) {
556,657✔
1017
  if (!uv_is_closing(handle)) {
556,657✔
1018
    uv_close(handle, NULL);
429,540✔
1019
  }
1020
}
556,657✔
1021
static void uvFreeCb(uv_handle_t* handle) {
116,934✔
1022
  //
1023
  taosMemoryFree(handle);
116,934!
1024
}
116,934✔
1025

1026
static void uvAcceptAsyncCb(uv_async_t* async) {
2,388✔
1027
  SServerObj* srv = async->data;
2,388✔
1028
  tDebug("close server port %d", srv->port);
2,388✔
1029
  uv_walk(srv->loop, uvWalkCb, NULL);
2,388✔
1030
}
2,388✔
1031

1032
static void uvShutDownCb(uv_shutdown_t* req, int status) {
×
1033
  if (status != 0) {
×
1034
    tDebug("conn failed to shut down:%s", uv_err_name(status));
×
1035
  }
1036
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
×
1037
  taosMemoryFree(req);
×
1038
}
×
1039
static void uvWorkDoTask(uv_work_t* req) {
×
1040
  // doing time-consumeing task
1041
  // only auth conn currently, add more func later
1042
  tTrace("conn:%p, start to be processed in BG Thread", req->data);
×
1043
  return;
×
1044
}
1045

1046
static void uvWorkAfterTask(uv_work_t* req, int status) {
×
1047
  if (status != 0) {
×
1048
    tTrace("conn:%p, failed to processed ", req->data);
×
1049
  }
1050
  // Done time-consumeing task
1051
  // add more func later
1052
  // this func called in main loop
1053
  tTrace("conn:%p, already processed ", req->data);
×
1054
  taosMemoryFree(req);
×
1055
}
×
1056

1057
void uvOnAcceptCb(uv_stream_t* stream, int status) {
116,934✔
1058
  if (status == -1) {
116,934!
1059
    return;
×
1060
  }
1061
  SServerObj* pObj = container_of(stream, SServerObj, server);
116,934✔
1062

1063
  uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
116,934!
1064
  if (cli == NULL) return;
116,934!
1065

1066
  int err = uv_tcp_init(pObj->loop, cli);
116,934✔
1067
  if (err != 0) {
116,934!
1068
    tError("failed to create tcp:%s", uv_err_name(err));
×
1069
    taosMemoryFree(cli);
×
1070
    return;
×
1071
  }
1072
  err = uv_accept(stream, (uv_stream_t*)cli);
116,934✔
1073
  if (err == 0) {
116,934!
1074
#if defined(WINDOWS) || defined(DARWIN)
1075
    if (pObj->numOfWorkerReady < pObj->numOfThreads) {
1076
      tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
1077
             pObj->numOfWorkerReady);
1078
      uv_close((uv_handle_t*)cli, uvFreeCb);
1079
      return;
1080
    }
1081
#endif
1082

1083
    uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
116,934!
1084
    if (wr == NULL) {
116,934!
1085
      tError("failed to accept since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
1086
      return;
×
1087
    }
1088

1089
    wr->data = cli;
116,934✔
1090
    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
116,934✔
1091

1092
    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
116,934✔
1093

1094
    tTrace("new connection accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
116,934✔
1095

1096
    TAOS_UNUSED(
116,934✔
1097
        uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb));
1098
  } else {
1099
    if (!uv_is_closing((uv_handle_t*)cli)) {
×
1100
      tError("failed to accept tcp:%s", uv_err_name(err));
×
1101
      uv_close((uv_handle_t*)cli, uvFreeCb);
×
1102
    } else {
1103
      tError("failed to accept tcp:%s", uv_err_name(err));
×
1104
      taosMemoryFree(cli);
×
1105
    }
1106
  }
1107
}
1108
void uvGetSockInfo(struct sockaddr* addr, SIpAddr* ip) {
233,798✔
1109
  if (addr->sa_family == AF_INET) {
233,798!
1110
    struct sockaddr_in* addr_in = (struct sockaddr_in*)addr;
233,818✔
1111
    inet_ntop(AF_INET, &addr_in->sin_addr, ip->ipv4, INET_ADDRSTRLEN);
233,818✔
1112
    ip->type = 0;
233,780✔
1113
    ip->port = ntohs(addr_in->sin_port);
233,780✔
1114
    ip->mask = 32;
233,780✔
1115
  } else if (addr->sa_family == AF_INET6) {
×
1116
    struct sockaddr_in6* addr_in = (struct sockaddr_in6*)addr;
×
1117
    ip->port = ntohs(addr_in->sin6_port);
×
1118
    inet_ntop(AF_INET6, &addr_in->sin6_addr, ip->ipv6, INET6_ADDRSTRLEN);
×
1119
    ip->type = 1;
×
1120
    ip->mask = 128;
×
1121
  }
1122
}
233,760✔
1123
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
162,497✔
1124
  int32_t code = 0;
162,497✔
1125
  STUB_RAND_NETWORK_ERR(nread);
1126
  if (nread < 0) {
162,497✔
1127
    if (nread != UV_EOF) {
45,796!
1128
      tError("read error %s", uv_err_name(nread));
×
1129
    }
1130
    // TODO(log other failure reason)
1131
    tWarn("failed to create connect:%p since %s", q, uv_err_name(nread));
45,969!
1132
    taosMemoryFree(buf->base);
47,196!
1133
    uv_close((uv_handle_t*)q, NULL);
47,189✔
1134
    return;
47,171✔
1135
  }
1136
  // free memory allocated by
1137
  if (nread != strlen(notify) || strncmp(buf->base, notify, strlen(notify)) != 0) {
116,701!
1138
    tError("failed to read pip ");
×
1139
    taosMemoryFree(buf->base);
×
1140
    uv_close((uv_handle_t*)q, NULL);
×
1141
  }
1142

1143
  taosMemoryFree(buf->base);
116,877!
1144

1145
  SWorkThrd* pThrd = q->data;
116,910✔
1146

1147
  uv_pipe_t* pipe = (uv_pipe_t*)q;
116,910✔
1148
  if (!uv_pipe_pending_count(pipe)) {
116,910!
1149
    tError("No pending count");
×
1150
    // uv_close((uv_handle_t*)q, NULL);
1151
    return;
×
1152
  }
1153
  if (pThrd->quit) {
116,933!
1154
    tWarn("thread already received quit msg, ignore incoming conn");
×
1155
    // uv_close((uv_handle_t*)q, NULL);
1156
    return;
×
1157
  }
1158

1159
  SSvrConn* pConn = createConn(pThrd);
116,915✔
1160
  if (pConn == NULL) {
116,915!
1161
    // uv_close((uv_handle_t*)q, NULL);
1162
    return;
×
1163
  }
1164

1165
  if ((code = uv_accept(q, (uv_stream_t*)(pConn->pTcp))) == 0) {
116,915!
1166
    uv_os_fd_t fd;
1167
    TAOS_UNUSED(uv_fileno((const uv_handle_t*)pConn->pTcp, &fd));
116,930✔
1168
    tTrace("conn:%p, created, fd:%d", pConn, fd);
116,912✔
1169

1170
    struct sockaddr_storage peername, sockname;
1171
    // Get and valid the peer info
1172
    int addrlen = sizeof(peername);
116,912✔
1173
    if ((code = uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&peername, &addrlen)) != 0) {
116,912!
1174
      tError("conn:%p, failed to get peer info since %s", pConn, uv_strerror(code));
×
1175
      transUnrefSrvHandle(pConn);
×
1176
      return;
×
1177
    }
1178

1179
    if (peername.ss_family != AF_INET && peername.ss_family != AF_INET6) {
116,914!
1180
      tError("conn:%p, failed to get peer info since not support other protocol except ipv4", pConn);
×
1181
      transUnrefSrvHandle(pConn);
×
1182
      return;
×
1183
    }
1184

1185
    // Get and valid the sock info
1186
    addrlen = sizeof(sockname);
116,914✔
1187
    if ((code = uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&sockname, &addrlen)) != 0) {
116,914!
1188
      tError("conn:%p, failed to get local info since %s", pConn, uv_strerror(code));
×
1189
      transUnrefSrvHandle(pConn);
×
1190
      return;
×
1191
    }
1192
    if (sockname.ss_family != AF_INET && peername.ss_family != AF_INET6) {
116,901!
1193
      tError("conn:%p, failed to get sock info since not support other protocol except ipv4", pConn);
×
1194
      transUnrefSrvHandle(pConn);
×
1195
      return;
×
1196
    }
1197

1198
    TAOS_UNUSED(transSockInfo2Str((struct sockaddr*)&peername, pConn->dst));
116,901✔
1199
    TAOS_UNUSED(transSockInfo2Str((struct sockaddr*)&sockname, pConn->src));
116,901✔
1200

1201
    uvGetSockInfo((struct sockaddr*)&peername, &pConn->clientIp);
116,924✔
1202
    uvGetSockInfo((struct sockaddr*)&sockname, &pConn->serverIp);
116,894✔
1203

1204
    pConn->port = pConn->clientIp.port;
116,904✔
1205

1206
    code = transSetConnOption((uv_tcp_t*)pConn->pTcp, 20);
116,904✔
1207
    if (code != 0) {
116,892!
1208
      tWarn("failed to set tcp option since %s", tstrerror(code));
×
1209
    }
1210
    code = uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
116,892✔
1211
    if (code != 0) {
116,924✔
1212
      tWarn("conn:%p, failed to start to read since %s", pConn, uv_err_name(code));
3!
1213
      transUnrefSrvHandle(pConn);
3✔
1214
      return;
×
1215
    }
1216
  } else {
1217
    tDebug("failed to create new connection reason %s", uv_err_name(code));
×
1218
    transUnrefSrvHandle(pConn);
×
1219
  }
1220
}
1221

1222
void* transAcceptThread(void* arg) {
2,388✔
1223
  // opt
1224
  setThreadName("trans-accept");
2,388✔
1225
  SServerObj* srv = (SServerObj*)arg;
2,388✔
1226
  TAOS_UNUSED(uv_run(srv->loop, UV_RUN_DEFAULT));
2,388✔
1227

1228
  return NULL;
2,388✔
1229
}
1230
void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
×
1231
  STUB_RAND_NETWORK_ERR(status);
1232
  if (status != 0) {
×
1233
    return;
×
1234
  };
1235

1236
  SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req);
×
1237
  TAOS_UNUSED(uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb));
×
1238
}
1239
static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
47,196✔
1240
  int32_t code = 0;
47,196✔
1241
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
47,196!
1242
  if (pThrd->loop == NULL) {
47,196!
1243
    return terrno;
×
1244
  }
1245

1246
  if ((code = uv_loop_init(pThrd->loop)) != 0) {
47,196!
1247
    tError("failed to init loop since %s", uv_err_name(code));
×
1248
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1249
  }
1250

1251
#if defined(WINDOWS) || defined(DARWIN)
1252
  code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
1253
  if (code != 0) {
1254
    tError("failed to init pip since %s", uv_err_name(code));
1255
    return TSDB_CODE_THIRDPARTY_ERROR;
1256
  }
1257
#else
1258
  code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
47,196✔
1259
  if (code != 0) {
47,196!
1260
    tError("failed to init pip since %s", uv_err_name(code));
×
1261
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1262
  }
1263

1264
  code = uv_pipe_open(pThrd->pipe, pThrd->fd);
47,196✔
1265
  if (code != 0) {
47,196!
1266
    tError("failed to open pip since %s", uv_err_name(code));
×
1267
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1268
  }
1269
#endif
1270

1271
  pThrd->pipe->data = pThrd;
47,196✔
1272

1273
  QUEUE_INIT(&pThrd->msg);
47,196✔
1274

1275
  // conn set
1276
  QUEUE_INIT(&pThrd->conn);
47,196✔
1277

1278
  code = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb, &pThrd->asyncPool);
47,196✔
1279
  if (code != 0) {
47,196!
1280
    tError("failed to init async pool since:%s", tstrerror(code));
×
1281
    return code;
×
1282
  }
1283
#if defined(WINDOWS) || defined(DARWIN)
1284
  uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
1285

1286
#else
1287
  code = uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
47,196✔
1288
  if (code != 0) {
47,196!
1289
    tError("failed to start read pipe:%s", uv_err_name(code));
×
1290
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1291
  }
1292
#endif
1293
  return 0;
47,196✔
1294
}
1295

1296
static int32_t addHandleToAcceptloop(void* arg) {
2,388✔
1297
  // impl later
1298
  SServerObj* srv = arg;
2,388✔
1299

1300
  int code = 0;
2,388✔
1301
  if ((code = uv_tcp_init(srv->loop, &srv->server)) != 0) {
2,388!
1302
    tError("failed to init accept server since %s", uv_err_name(code));
×
1303
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1304
  }
1305

1306
  // register an async here to quit server gracefully
1307
  srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
2,388!
1308
  if (srv->pAcceptAsync == NULL) {
2,388!
1309
    tError("failed to create async since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
1310
    return terrno;
×
1311
  }
1312

1313
  code = uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
2,388✔
1314
  if (code != 0) {
2,388!
1315
    tError("failed to init async since:%s", uv_err_name(code));
×
1316
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1317
  }
1318
  srv->pAcceptAsync->data = srv;
2,388✔
1319

1320
  if (srv->ipv6) {
2,388!
1321
    struct sockaddr_in6 bind_addr;
1322
    if ((code = uv_ip6_addr("::", srv->port, &bind_addr)) != 0) {
×
1323
      tError("failed to bind addr since %s", uv_err_name(code));
×
1324
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1325
    }
1326

1327
    if ((code = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 1)) != 0) {
×
1328
      tError("failed to bind since %s", uv_err_name(code));
×
1329
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1330
    }
1331
    tInfo("bind to ipv6 addr");
×
1332
  } else {
1333
    struct sockaddr_in bind_addr;
1334
    if ((code = uv_ip4_addr("0.0.0.0", srv->port, &bind_addr)) != 0) {
2,388!
1335
      tError("failed to bind addr since %s", uv_err_name(code));
×
1336
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1337
    }
1338

1339
    if ((code = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
2,388!
1340
      tError("failed to bind since %s", uv_err_name(code));
×
1341
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1342
    }
1343
    tInfo("bind to ipv4 addr");
2,388!
1344
  }
1345
  if ((code = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) {
2,388!
1346
    tError("failed to listen since %s", uv_err_name(code));
×
1347
    return TSDB_CODE_RPC_PORT_EADDRINUSE;
×
1348
  }
1349
  return 0;
2,388✔
1350
}
1351

1352
void* transWorkerThread(void* arg) {
47,196✔
1353
  int32_t code = 0;
47,196✔
1354
  setThreadName("trans-svr-work");
47,196✔
1355
  SWorkThrd* pThrd = (SWorkThrd*)arg;
47,196✔
1356
  tsEnableRandErr = true;
47,196✔
1357
  code = uv_run(pThrd->loop, UV_RUN_DEFAULT);
47,196✔
1358
  if (code != 0) {
47,196!
1359
    tWarn("failed to start to worker thread since %s", uv_err_name(code));
×
1360
  }
1361

1362
  return NULL;
47,196✔
1363
}
1364
void uvDestroyResp(void* e) {
×
1365
  SSvrRespMsg* pMsg = QUEUE_DATA(e, SSvrRespMsg, q);
×
1366
  destroySmsg(pMsg);
1367
}
×
1368
static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
1369
  int32_t    code = 0;
116,933✔
1370
  SWorkThrd* pThrd = hThrd;
116,933✔
1371
  int32_t    lino;
1372
  int8_t     wqInited = 0;
116,933✔
1373
  SSvrConn*  pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
116,933!
1374
  if (pConn == NULL) {
116,867!
1375
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
×
1376
  }
1377

1378
  QUEUE_INIT(&pConn->queue);
116,867✔
1379

1380
  if ((code = transQueueInit(&pConn->resps, uvDestroyResp)) != 0) {
116,867!
1381
    TAOS_CHECK_GOTO(code, &lino, _end);
×
1382
  }
1383

1384
  if ((code = transInitBuffer(&pConn->readBuf)) != 0) {
116,868!
1385
    TAOS_CHECK_GOTO(code, &lino, _end);
×
1386
  }
1387

1388
  pConn->broken = false;
116,928✔
1389
  pConn->status = ConnNormal;
116,928✔
1390

1391
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
116,928!
1392
  if (exh == NULL) {
116,916!
1393
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
×
1394
  }
1395

1396
  exh->handle = pConn;
116,916✔
1397
  exh->pThrd = pThrd;
116,916✔
1398
  exh->refId = transAddExHandle(uvGetConnRefOfThrd(pThrd), exh);
116,916✔
1399
  if (exh->refId < 0) {
116,930!
1400
    TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, &lino, _end);
×
1401
  }
1402

1403
  SExHandle* pSelf = transAcquireExHandle(uvGetConnRefOfThrd(pThrd), exh->refId);
116,930✔
1404
  if (pSelf != exh) {
116,932!
1405
    TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
×
1406
  }
1407

1408
  STrans* pInst = pThrd->pInst;
116,932✔
1409
  pConn->refId = exh->refId;
116,932✔
1410

1411
  QUEUE_INIT(&exh->q);
116,932✔
1412
  tTrace("%s handle %p, conn:%p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId);
116,932✔
1413

1414
  pConn->pQTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
116,932✔
1415
  if (pConn->pQTable == NULL) {
116,923!
1416
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
×
1417
  }
1418

1419
  code = initWQ(&pConn->wq);
116,923✔
1420
  TAOS_CHECK_GOTO(code, &lino, _end);
116,931!
1421
  wqInited = 1;
116,931✔
1422

1423
  // init client handle
1424
  pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
116,931!
1425
  if (pConn->pTcp == NULL) {
116,931!
1426
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
×
1427
  }
1428

1429
  pConn->bufSize = pInst->shareConnLimit;
116,931✔
1430
  pConn->buf = taosMemoryCalloc(1, pInst->shareConnLimit * sizeof(uv_buf_t));
116,931!
1431
  if (pConn->buf == NULL) {
116,922!
1432
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
×
1433
  }
1434

1435
  code = uv_tcp_init(pThrd->loop, pConn->pTcp);
116,922✔
1436
  if (code != 0) {
116,920!
1437
    tError("%s failed to create conn since %s" PRId64, transLabel(pInst), uv_strerror(code));
×
1438
    TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end);
×
1439
  }
1440
  pConn->pTcp->data = pConn;
116,920✔
1441
  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
116,920✔
1442

1443
  pConn->pInst = pThrd->pInst;
116,920✔
1444
  pConn->hostThrd = pThrd;
116,920✔
1445

1446
  transRefSrvHandle(pConn);
116,920✔
1447
  return pConn;
116,915✔
1448
_end:
×
1449
  if (pConn) {
×
1450
    if (pConn->refId > 0) {
×
1451
      transReleaseExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId);
×
1452
      transRemoveExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId);
×
1453
      pConn->refId = -1;
×
1454
    }
1455

1456
    transQueueDestroy(&pConn->resps);
×
1457
    transDestroyBuffer(&pConn->readBuf);
×
1458
    taosHashCleanup(pConn->pQTable);
×
1459
    taosMemoryFree(pConn->pTcp);
×
1460
    if (wqInited) destroyWQ(&pConn->wq);
×
1461
    taosMemoryFree(pConn->buf);
×
1462
    taosMemoryFree(pConn);
×
1463
    pConn = NULL;
×
1464
  }
1465
  tError("%s failed to create conn since %s", transLabel(pInst), tstrerror(code));
×
1466
  return NULL;
×
1467
}
1468

1469
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) {
1470
  if (conn == NULL) {
116,929!
1471
    return;
×
1472
  }
1473

1474
  if (clear) {
116,929!
1475
    if (!uv_is_closing((uv_handle_t*)conn->pTcp)) {
116,929!
1476
      tTrace("conn:%p, to be destroyed", conn);
116,931!
1477
      uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
116,931✔
1478
    }
1479
  }
1480
}
1481

1482
void uvConnDestroyAllState(SSvrConn* p) {
116,928✔
1483
  STrans*   pInst = p->pInst;
116,928✔
1484
  SHashObj* pQTable = p->pQTable;
116,928✔
1485
  if (pQTable == NULL) return;
116,928!
1486

1487
  void* pIter = taosHashIterate(pQTable, NULL);
116,928✔
1488
  while (pIter) {
126,185✔
1489
    SSvrRegArg* arg = pIter;
9,260✔
1490
    int64_t*    qid = taosHashGetKey(pIter, NULL);
9,260✔
1491
    (pInst->cfp)(pInst->parent, &(arg->msg), NULL);
9,260✔
1492
    tTrace("conn:%p, broken, notify server app, sid:%" PRId64, p, *qid);
9,261✔
1493
    pIter = taosHashIterate(pQTable, pIter);
9,261✔
1494
  }
1495

1496
  taosHashCleanup(pQTable);
116,925✔
1497
  pQTable = NULL;
116,930✔
1498
  return;
116,930✔
1499
}
1500
static void uvDestroyConn(uv_handle_t* handle) {
116,913✔
1501
  SSvrConn* conn = handle->data;
116,913✔
1502

1503
  if (conn == NULL) {
116,913!
1504
    return;
×
1505
  }
1506
  SWorkThrd* thrd = conn->hostThrd;
116,913✔
1507

1508
  transReleaseExHandle(uvGetConnRefOfThrd(thrd), conn->refId);
116,913✔
1509
  transRemoveExHandle(uvGetConnRefOfThrd(thrd), conn->refId);
116,929✔
1510

1511
  STrans* pInst = thrd->pInst;
116,934✔
1512
  tDebug("%s conn:%p, destroy", transLabel(pInst), conn);
116,934✔
1513

1514
  transQueueDestroy(&conn->resps);
116,935✔
1515

1516
  QUEUE_REMOVE(&conn->queue);
116,932✔
1517

1518
  taosMemoryFree(conn->pTcp);
116,932!
1519

1520
  uvConnDestroyAllState(conn);
116,932✔
1521

1522
  transDestroyBuffer(&conn->readBuf);
116,930✔
1523

1524
  destroyWQ(&conn->wq);
116,934✔
1525
  taosMemoryFree(conn->buf);
116,931!
1526
  taosMemoryFree(conn);
116,930!
1527

1528
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
116,930!
1529
    tTrace("work thread quit");
28,427✔
1530
    uv_walk(thrd->loop, uvWalkCb, NULL);
28,427✔
1531
  }
1532
}
1533
static void uvPipeListenCb(uv_stream_t* handle, int status) {
×
1534
  if (status != 0) {
×
1535
    tError("server failed to init pipe, errmsg:%s", uv_err_name(status));
×
1536
    return;
×
1537
  }
1538

1539
  SServerObj* srv = container_of(handle, SServerObj, pipeListen);
×
1540
  uv_pipe_t*  pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
×
1541

1542
  int ret = uv_pipe_init(srv->loop, pipe, 1);
×
1543
  if (ret != 0) {
×
1544
    tError("trans-svr failed to init pipe, errmsg:%s", uv_err_name(ret));
×
1545
  }
1546

1547
  ret = uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe);
×
1548
  if (ret != 0) {
×
1549
    tError("trans-svr failed to accept pipe, errmsg:%s", uv_err_name(ret));
×
1550
    return;
×
1551
  }
1552

1553
  ret = uv_is_readable((uv_stream_t*)pipe);
×
1554
  if (ret != 1) {
×
1555
    tError("trans-svr failed to check pipe, pip not readable");
×
1556
    return;
×
1557
  }
1558
  ret = uv_is_writable((uv_stream_t*)pipe);
×
1559
  if (ret != 1) {
×
1560
    tError("trans-svr failed to check pipe, pip not writable");
×
1561
    return;
×
1562
  }
1563

1564
  ret = uv_is_closing((uv_handle_t*)pipe);
×
1565
  if (ret != 0) {
×
1566
    tError("trans-svr failed to check pipe, pip is closing");
×
1567
    return;
×
1568
  }
1569
  srv->numOfWorkerReady++;
×
1570
}
1571

1572
void* transInitServer(SIpAddr* addr, char* label, int numOfThreads, void* fp, void* pInit) {
2,388✔
1573
  int32_t code = 0;
2,388✔
1574

1575
  SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
2,388!
1576
  if (srv == NULL) {
2,388!
1577
    code = terrno;
×
1578
    tError("failed to init server since:%s", tstrerror(code));
×
1579
    return NULL;
×
1580
  }
1581

1582
  STrans* pInst = (STrans*)pInit;
2,388✔
1583

1584
  srv->ipv6 = pInst->ipv6;
2,388✔
1585
  srv->addr = *addr;
2,388✔
1586
  srv->ip = 0;
2,388✔
1587
  srv->port = addr->port;
2,388✔
1588
  srv->numOfThreads = numOfThreads;
2,388✔
1589
  srv->workerIdx = 0;
2,388✔
1590
  srv->numOfWorkerReady = 0;
2,388✔
1591
  srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
2,388!
1592
  srv->pThreadObj = (SWorkThrd**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrd*));
2,388!
1593
  srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
2,388!
1594
  if (srv->loop == NULL || srv->pThreadObj == NULL || srv->pipe == NULL) {
2,388!
1595
    code = terrno;
×
1596
    goto End;
×
1597
  }
1598

1599
  code = uv_loop_init(srv->loop);
2,388✔
1600
  if (code != 0) {
2,388!
1601
    tError("failed to init server since:%s", uv_err_name(code));
×
1602
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1603
    goto End;
×
1604
  }
1605

1606
  if (false == taosValidIpAndPort(srv->ip, srv->port)) {
2,388!
1607
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1608
    tError("invalid ip/port, %d:%d since %s", srv->ip, srv->port, terrstr());
×
1609
    code = 0;
×
1610
    terrno = 0;
×
1611
  }
1612

1613
  char pipeName[PATH_MAX];
1614

1615
#if defined(WINDOWS) || defined(DARWIN)
1616
  int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
1617
  if (ret != 0) {
1618
    tError("failed to init pipe, errmsg:%s", uv_err_name(ret));
1619
    goto End;
1620
  }
1621
#if defined(WINDOWS)
1622
  snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId());
1623
#elif defined(DARWIN)
1624
  snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(),
1625
           taosGetSelfPthreadId());
1626
#endif
1627

1628
  ret = uv_pipe_bind(&srv->pipeListen, pipeName);
1629
  if (ret != 0) {
1630
    tError("failed to bind pipe, errmsg:%s", uv_err_name(ret));
1631
    goto End;
1632
  }
1633

1634
  ret = uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb);
1635
  if (ret != 0) {
1636
    tError("failed to listen pipe, errmsg:%s", uv_err_name(ret));
1637
    goto End;
1638
  }
1639

1640
  for (int i = 0; i < srv->numOfThreads; i++) {
1641
    SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
1642
    thrd->pInst = pInit;
1643
    thrd->quit = false;
1644
    thrd->pInst = pInit;
1645
    thrd->pWhiteList = uvWhiteListCreate();
1646
    if (thrd->pWhiteList == NULL) {
1647
      destroyWorkThrdObj(thrd);
1648
      code = terrno;
1649
      goto End;
1650
    }
1651
    thrd->connRefMgt = transOpenRefMgt(50000, transDestroyExHandle);
1652
    if (thrd->connRefMgt < 0) {
1653
      code = thrd->connRefMgt;
1654
      goto End;
1655
    }
1656

1657
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
1658
    if (srv->pipe[i] == NULL) {
1659
      destroyWorkThrdObj(thrd);
1660
      code = terrno;
1661
      goto End;
1662
    }
1663

1664
    thrd->pipe = &(srv->pipe[i][1]);  // init read
1665
    srv->pThreadObj[i] = thrd;
1666

1667
    if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) {
1668
      goto End;
1669
    }
1670

1671
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
1672
    if (err == 0) {
1673
      tDebug("success to create worker-thread:%d", i);
1674
    } else {
1675
      // TODO: clear all other resource later
1676
      tError("failed to create worker-thread:%d", i);
1677
      goto End;
1678
    }
1679
    thrd->inited = 1;
1680
  }
1681
#else
1682

1683
  for (int i = 0; i < srv->numOfThreads; i++) {
49,584✔
1684
    SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
47,196!
1685
    if (thrd == NULL) {
47,196!
1686
      code = terrno;
×
1687
      goto End;
×
1688
    }
1689

1690
    thrd->pInst = pInit;
47,196✔
1691
    thrd->quit = false;
47,196✔
1692
    thrd->pInst = pInit;
47,196✔
1693
    thrd->pWhiteList = uvWhiteListCreate();
47,196✔
1694
    if (thrd->pWhiteList == NULL) {
47,196!
1695
      destroyWorkThrdObj(thrd);
×
1696
      code = terrno;
×
1697
      goto End;
×
1698
    }
1699

1700
    thrd->connRefMgt = transOpenRefMgt(50000, transDestroyExHandle);
47,196✔
1701
    if (thrd->connRefMgt < 0) {
47,196!
1702
      code = thrd->connRefMgt;
×
1703
      goto End;
×
1704
    }
1705

1706
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
47,196!
1707
    if (srv->pipe[i] == NULL) {
47,196!
1708
      code = terrno;
×
1709
      goto End;
×
1710
    }
1711

1712
    uv_os_sock_t fds[2];
1713
    if ((code = uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE)) != 0) {
47,196!
1714
      tError("failed to create pipe, errmsg:%s", uv_err_name(code));
×
1715
      code = TSDB_CODE_THIRDPARTY_ERROR;
×
1716
      goto End;
×
1717
    }
1718

1719
    code = uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
47,196✔
1720
    if (code != 0) {
47,196!
1721
      tError("failed to init pipe, errmsg:%s", uv_err_name(code));
×
1722
      code = TSDB_CODE_THIRDPARTY_ERROR;
×
1723
      goto End;
×
1724
    }
1725

1726
    code = uv_pipe_open(&(srv->pipe[i][0]), fds[1]);
47,196✔
1727
    if (code != 0) {
47,196!
1728
      tError("failed to init pipe, errmsg:%s", uv_err_name(code));
×
1729
      code = TSDB_CODE_THIRDPARTY_ERROR;
×
1730
      goto End;
×
1731
    }
1732

1733
    thrd->pipe = &(srv->pipe[i][1]);  // init read
47,196✔
1734
    thrd->fd = fds[0];
47,196✔
1735
    srv->pThreadObj[i] = thrd;
47,196✔
1736

1737
    thrd->inited = 1;
47,196✔
1738
    if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) {
47,196!
1739
      goto End;
×
1740
    }
1741

1742
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
47,196✔
1743
    if (err == 0) {
47,196!
1744
      tDebug("success to create worker-thread:%d", i);
47,196✔
1745
    } else {
1746
      // TODO: clear all other resource later
1747
      tError("failed to create worker-thread:%d", i);
×
1748
      goto End;
×
1749
    }
1750
  }
1751
#endif
1752

1753
  if ((code = addHandleToAcceptloop(srv)) != 0) {
2,388!
1754
    goto End;
×
1755
  }
1756

1757
  code = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
2,388✔
1758
  if (code == 0) {
2,388!
1759
    tDebug("success to create accept-thread");
2,388✔
1760
  } else {
1761
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1762
    tError("failed  to create accept-thread since %s", tstrerror(code));
×
1763

1764
    goto End;
×
1765
    // clear all resource later
1766
  }
1767

1768
  srv->inited = true;
2,388✔
1769
  return srv;
2,388✔
1770
End:
×
1771
  for (int i = 0; i < srv->numOfThreads; i++) {
×
1772
    if (srv->pThreadObj[i] != NULL) {
×
1773
      SWorkThrd* thrd = srv->pThreadObj[i];
×
1774
      destroyWorkThrd(thrd);
×
1775
    }
1776
  }
1777
  transCloseServer(srv);
×
1778
  terrno = code;
×
1779
  return NULL;
×
1780
}
1781

1782
void uvHandleQuit(SSvrRespMsg* msg, SWorkThrd* thrd) {
47,196✔
1783
  thrd->quit = true;
47,196✔
1784
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
47,196✔
1785
    uv_walk(thrd->loop, uvWalkCb, NULL);
31,711✔
1786
  } else {
1787
    destroyAllConn(thrd);
15,485✔
1788
  }
1789
  taosMemoryFree(msg);
47,196!
1790
}
47,196✔
1791
void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd) { return; }
×
1792

1793
void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd) {
9,392,872✔
1794
  // send msg to client
1795
  tTrace("%s conn:%p, start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn);
9,392,872✔
1796
  uvStartSendResp(msg);
9,392,872✔
1797
}
9,393,820✔
1798

1799
int32_t uvHandleStateReq(SSvrRespMsg* msg) {
2,552,506✔
1800
  int32_t   code = 0;
2,552,506✔
1801
  SSvrConn* conn = msg->pConn;
2,552,506✔
1802
  int64_t   qid = msg->msg.info.qId;
2,552,506✔
1803
  tDebug("%s conn:%p, start to register brokenlink callback, sid:%" PRId64, transLabel(conn->pInst), conn, qid);
2,552,506✔
1804

1805
  SSvrRegArg  arg = {.notifyCount = 0, .init = 1, .msg = msg->msg};
2,552,517✔
1806
  SSvrRegArg* p = taosHashGet(conn->pQTable, &qid, sizeof(qid));
2,552,517✔
1807
  if (p != NULL) {
2,552,748✔
1808
    transFreeMsg(p->msg.pCont);
4✔
1809
  }
1810

1811
  code = taosHashPut(conn->pQTable, &qid, sizeof(qid), &arg, sizeof(arg));
2,552,748✔
1812
  if (code == 0) tDebug("conn:%p, register brokenlink callback succ", conn);
2,552,798!
1813
  return code;
2,552,786✔
1814
}
1815
void uvHandleRegister(SSvrRespMsg* msg, SWorkThrd* thrd) {
2,551,990✔
1816
  SSvrConn* conn = msg->pConn;
2,551,990✔
1817
  tDebug("%s conn:%p, register brokenlink callback", transLabel(thrd->pInst), conn);
2,551,990✔
1818
  int32_t code = uvHandleStateReq(msg);
2,552,231✔
1819
  taosMemoryFree(msg);
2,552,794!
1820
}
2,552,792✔
1821

1822
void uvHandleUpdate(SSvrRespMsg* msg, SWorkThrd* thrd) {
182,264✔
1823
  SUpdateIpWhite* req = msg->arg;
182,264✔
1824
  if (req == NULL) {
182,264!
1825
    tDebug("ip-white-list disable on trans");
183,553!
1826
    thrd->enableIpWhiteList = 0;
187,816✔
1827
    taosMemoryFree(msg);
187,816!
1828
    return;
190,021✔
1829
  }
1830
  int32_t code = 0;
×
1831
  for (int i = 0; i < req->numOfUser; i++) {
×
1832
    SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
31✔
1833

1834
    int32_t sz = pUser->numOfRange * sizeof(SIpRange);
31✔
1835

1836
    SIpWhiteListDual* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteListDual));
31!
1837
    if (pList == NULL) {
32!
1838
      tError("failed to create ip-white-list since %s", tstrerror(code));
×
1839
      code = terrno;
×
1840
      break;
×
1841
    }
1842
    pList->num = pUser->numOfRange;
32✔
1843
    memcpy(pList->pIpRanges, pUser->pIpDualRanges, sz);
32✔
1844
    code = uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver);
32✔
1845
    if (code != 0) {
30!
1846
      break;
×
1847
    }
1848
  }
1849

1850
  if (code == 0) {
×
1851
    thrd->pWhiteList->ver = req->ver;
30✔
1852
    thrd->enableIpWhiteList = 1;
30✔
1853
  } else {
1854
    tError("failed to update ip-white-list since %s", tstrerror(code));
×
1855
  }
1856
  tFreeSUpdateIpWhiteReq(req);
×
1857
  taosMemoryFree(req);
31!
1858
  taosMemoryFree(msg);
31!
1859
}
1860

1861
void destroyWorkThrdObj(SWorkThrd* pThrd) {
47,196✔
1862
  if (pThrd == NULL) {
47,196!
1863
    return;
×
1864
  }
1865
  transAsyncPoolDestroy(pThrd->asyncPool);
47,196✔
1866
  uvWhiteListDestroy(pThrd->pWhiteList);
47,196✔
1867
  taosCloseRef(pThrd->connRefMgt);
47,196✔
1868
  taosMemoryFree(pThrd->loop);
47,196!
1869
  taosMemoryFree(pThrd);
47,196!
1870
}
1871
void destroyWorkThrd(SWorkThrd* pThrd) {
47,196✔
1872
  if (pThrd == NULL) {
47,196!
1873
    return;
×
1874
  }
1875
  if (pThrd->inited) {
47,196!
1876
    sendQuitToWorkThrd(pThrd);
47,196✔
1877
    if ((taosThreadJoin(pThrd->thread, NULL)) != 0) {
47,196!
1878
      tError("failed to join work-thread");
×
1879
    }
1880

1881
    SRV_RELEASE_UV(pThrd->loop);
47,196✔
1882
    TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrRespMsg, destroySmsgWrapper, NULL);
424,764!
1883
  }
1884
  destroyWorkThrdObj(pThrd);
47,196✔
1885
}
1886
void sendQuitToWorkThrd(SWorkThrd* pThrd) {
47,196✔
1887
  SSvrRespMsg* msg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
47,196!
1888
  if (msg == NULL) {
47,196!
1889
    tError("failed to send quit msg to work thread since %s", tstrerror(terrno));
×
1890
    return;
×
1891
  }
1892
  msg->type = Quit;
47,196✔
1893
  tDebug("server send quit msg to work thread");
47,196✔
1894
  TAOS_UNUSED(transAsyncSend(pThrd->asyncPool, &msg->q));
47,196✔
1895
}
1896

1897
void transCloseServer(void* arg) {
2,388✔
1898
  // impl later
1899
  SServerObj* srv = arg;
2,388✔
1900

1901
  if (srv->inited) {
2,388!
1902
    tDebug("send quit msg to accept thread");
2,388✔
1903
    TAOS_UNUSED(uv_async_send(srv->pAcceptAsync));
2,388✔
1904
    if (taosThreadJoin(srv->thread, NULL) != 0) {
2,388!
1905
      tError("failed to join accept-thread");
×
1906
    }
1907

1908
    SRV_RELEASE_UV(srv->loop);
2,388✔
1909
    for (int i = 0; i < srv->numOfThreads; i++) {
49,584✔
1910
      destroyWorkThrd(srv->pThreadObj[i]);
47,196✔
1911
    }
1912
  } else {
1913
    SRV_RELEASE_UV(srv->loop);
×
1914
  }
1915

1916
  taosMemoryFree(srv->pThreadObj);
2,388!
1917
  taosMemoryFree(srv->pAcceptAsync);
2,388!
1918
  taosMemoryFree(srv->loop);
2,388!
1919

1920
  for (int i = 0; i < srv->numOfThreads; i++) {
49,584✔
1921
    if (srv->pipe[i] != NULL) {
47,196!
1922
      taosMemoryFree(srv->pipe[i]);
47,196!
1923
    }
1924
  }
1925
  taosMemoryFree(srv->pipe);
2,388!
1926

1927
  taosMemoryFree(srv);
2,388!
1928
}
2,388✔
1929

1930
void transRefSrvHandle(void* handle) {
10,555,972✔
1931
  if (handle == NULL) {
10,555,972!
1932
    return;
×
1933
  }
1934
  SSvrConn* pConn = handle;
10,555,972✔
1935
  pConn->ref++;
10,555,972✔
1936
  tTrace("conn:%p, ref count:%d", pConn, pConn->ref);
10,555,972✔
1937
}
1938

1939
void transUnrefSrvHandle(void* handle) {
10,556,728✔
1940
  if (handle == NULL) {
10,556,728!
1941
    return;
×
1942
  }
1943
  SSvrConn* pConn = handle;
10,556,728✔
1944
  pConn->ref--;
10,556,728✔
1945
  tTrace("conn:%p, ref count:%d", pConn, pConn->ref);
10,556,728✔
1946
  if (pConn->ref == 0) {
10,556,728✔
1947
    destroyConn((SSvrConn*)handle, true);
1948
  }
1949
}
1950

1951
int32_t transReleaseSrvHandle(void* handle, int32_t status) {
2,544,848✔
1952
  int32_t         code = 0;
2,544,848✔
1953
  SRpcHandleInfo* info = handle;
2,544,848✔
1954
  SExHandle*      exh = info->handle;
2,544,848✔
1955
  int64_t         qId = info->qId;
2,544,848✔
1956
  int64_t         refId = info->refId;
2,544,848✔
1957

1958
  ASYNC_CHECK_HANDLE(info->refIdMgt, refId, exh);
2,544,848!
1959

1960
  SWorkThrd* pThrd = exh->pThrd;
2,543,551✔
1961
  ASYNC_ERR_JRET(pThrd);
2,543,551!
1962

1963
  STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE,
2,543,551✔
1964
                    .code = status,
1965
                    .info.handle = exh,
1966
                    .info.ahandle = NULL,
1967
                    .info.refId = refId,
1968
                    .info.qId = qId,
1969
                    .info.traceId = info->traceId};
1970

1971
  SSvrRespMsg* m = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
2,543,551!
1972
  if (m == NULL) {
2,543,406!
1973
    code = terrno;
×
1974
    goto _return1;
×
1975
  }
1976

1977
  m->msg = tmsg;
2,543,406✔
1978
  m->type = Normal;
2,543,406✔
1979

1980
  tDebug("%s conn:%p, start to send %s, sid:%" PRId64, transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType),
2,543,406!
1981
         qId);
1982
  if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
2,543,536!
1983
    destroySmsg(m);
1984
    transReleaseExHandle(info->refIdMgt, refId);
×
1985
    return code;
×
1986
  }
1987

1988
  transReleaseExHandle(info->refIdMgt, refId);
2,543,516✔
1989
  return 0;
2,543,526✔
1990
_return1:
1,389✔
1991
  tDebug("handle %p failed to send to release handle", exh);
1,389✔
1992
  transReleaseExHandle(info->refIdMgt, refId);
1,389✔
1993
  return code;
1,389✔
1994
_return2:
×
1995
  tDebug("handle %p failed to send to release handle", exh);
×
1996
  return code;
×
1997
}
1998

1999
int32_t transSendResponse(const STransMsg* msg) {
7,051,362✔
2000
  int32_t code = 0;
7,051,362✔
2001

2002
  if (msg->info.noResp) {
7,051,362✔
2003
    rpcFreeCont(msg->pCont);
199,279✔
2004
    tTrace("no need send resp");
199,281✔
2005
    return 0;
199,281✔
2006
  }
2007
  SExHandle* exh = msg->info.handle;
6,852,083✔
2008

2009
  if (exh == NULL) {
6,852,083✔
2010
    rpcFreeCont(msg->pCont);
102✔
2011
    return 0;
102✔
2012
  }
2013
  int64_t refId = msg->info.refId;
6,851,981✔
2014
  ASYNC_CHECK_HANDLE(msg->info.refIdMgt, refId, exh);
6,851,981!
2015

2016
  STransMsg tmsg = *msg;
6,851,917✔
2017
  tmsg.info.refId = refId;
6,851,917✔
2018
  if (tmsg.info.qId == 0) {
6,851,917✔
2019
    tmsg.msgType = msg->info.msgType + 1;
2,545,770✔
2020
  }
2021

2022
  SWorkThrd* pThrd = exh->pThrd;
6,851,917✔
2023
  ASYNC_ERR_JRET(pThrd);
6,851,917!
2024

2025
  SSvrRespMsg* m = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
6,851,917!
2026
  if (m == NULL) {
6,851,707!
2027
    code = terrno;
×
2028
    goto _return1;
×
2029
  }
2030
  m->msg = tmsg;
6,851,707✔
2031

2032
  m->type = Normal;
6,851,707✔
2033

2034
  STraceId* trace = (STraceId*)&msg->info.traceId;
6,851,707✔
2035
  tGTrace("conn:%p, start to send resp (1/2)", exh->handle);
6,851,707!
2036
  if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
6,851,707!
2037
    destroySmsg(m);
2038
    transReleaseExHandle(msg->info.refIdMgt, refId);
×
2039
    return code;
×
2040
  }
2041

2042
  transReleaseExHandle(msg->info.refIdMgt, refId);
6,851,674✔
2043
  return 0;
6,851,749✔
2044

2045
_return1:
418✔
2046
  tDebug("handle %p failed to send resp", exh);
418✔
2047
  rpcFreeCont(msg->pCont);
418✔
2048
  transReleaseExHandle(msg->info.refIdMgt, refId);
418✔
2049
  return code;
354✔
2050
_return2:
×
2051
  tDebug("handle %p failed to send resp", exh);
×
2052
  rpcFreeCont(msg->pCont);
×
2053
  return code;
×
2054
}
2055
int32_t transRegisterMsg(const STransMsg* msg) {
2,552,342✔
2056
  int32_t code = 0;
2,552,342✔
2057

2058
  SExHandle* exh = msg->info.handle;
2,552,342✔
2059
  int64_t    refId = msg->info.refId;
2,552,342✔
2060
  ASYNC_CHECK_HANDLE(msg->info.refIdMgt, refId, exh);
2,552,342!
2061

2062
  STransMsg tmsg = *msg;
2,552,654✔
2063
  tmsg.info.noResp = 1;
2,552,654✔
2064

2065
  tmsg.info.qId = msg->info.qId;
2,552,654✔
2066
  tmsg.info.seqNum = msg->info.seqNum;
2,552,654✔
2067
  tmsg.info.refId = refId;
2,552,654✔
2068
  tmsg.info.refIdMgt = msg->info.refIdMgt;
2,552,654✔
2069

2070
  SWorkThrd* pThrd = exh->pThrd;
2,552,654✔
2071
  ASYNC_ERR_JRET(pThrd);
2,552,654!
2072

2073
  SSvrRespMsg* m = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
2,552,654!
2074
  if (m == NULL) {
2,552,546!
2075
    code = terrno;
×
2076
    goto _return1;
×
2077
  }
2078

2079
  m->msg = tmsg;
2,552,546✔
2080
  m->type = Register;
2,552,546✔
2081

2082
  STrans* pInst = pThrd->pInst;
2,552,546✔
2083
  tDebug("%s conn:%p, start to register brokenlink callback", transLabel(pInst), exh->handle);
2,552,546✔
2084
  if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
2,552,638!
2085
    destroySmsg(m);
2086
    transReleaseExHandle(msg->info.refIdMgt, refId);
×
2087
    return code;
×
2088
  }
2089

2090
  transReleaseExHandle(msg->info.refIdMgt, refId);
2,552,466✔
2091
  return 0;
2,552,617✔
2092

2093
_return1:
×
2094
  tDebug("handle %p failed to register brokenlink", exh);
×
2095
  rpcFreeCont(msg->pCont);
×
2096
  transReleaseExHandle(msg->info.refIdMgt, refId);
×
2097
  return code;
×
2098
_return2:
×
2099
  tDebug("handle %p failed to register brokenlink", exh);
×
2100
  rpcFreeCont(msg->pCont);
×
2101
  return code;
×
2102
}
2103

2104
int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
9,542✔
2105
  STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
9,542✔
2106
  if (pInst == NULL) {
9,542!
2107
    return TSDB_CODE_RPC_MODULE_QUIT;
×
2108
  }
2109

2110
  int32_t code = 0;
9,542✔
2111

2112
  tDebug("ip-white-list update on rpc");
9,542✔
2113
  SServerObj* svrObj = pInst->tcphandle;
9,542✔
2114
  for (int i = 0; i < svrObj->numOfThreads; i++) {
199,723✔
2115
    SWorkThrd* pThrd = svrObj->pThreadObj[i];
190,181✔
2116

2117
    SSvrRespMsg* msg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
190,181!
2118
    if (msg == NULL) {
190,181!
2119
      code = terrno;
×
2120
      break;
×
2121
    }
2122

2123
    SUpdateIpWhite* pReq = NULL;
190,181✔
2124
    code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq);
190,181✔
2125
    if (code != 0) {
190,181!
2126
      taosMemoryFree(msg);
×
2127
      break;
×
2128
    }
2129

2130
    msg->type = Update;
190,181✔
2131
    msg->arg = pReq;
190,181✔
2132

2133
    if ((code = transAsyncSend(pThrd->asyncPool, &msg->q)) != 0) {
190,181!
2134
      code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
×
2135
      tFreeSUpdateIpWhiteReq(pReq);
×
2136
      taosMemoryFree(pReq);
×
2137
      taosMemoryFree(msg);
×
2138
      break;
×
2139
    }
2140
  }
2141
  transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
9,542✔
2142

2143
  if (code != 0) {
9,542!
2144
    tError("ip-white-list update failed since %s", tstrerror(code));
×
2145
  }
2146
  return code;
9,542✔
2147
}
2148
#else
2149
int32_t transReleaseSrvHandle(void *handle, int32_t status) {
2150
  tDebug("rpc start to release svr handle");
2151
  return 0;
2152
}
2153
void transRefSrvHandle(void *handle) { return; }
2154

2155
void    transUnrefSrvHandle(void *handle) { return; }
2156
int32_t transSendResponse(STransMsg *msg) {
2157
  int32_t code = 0;
2158
  if (rpcIsReq(msg->info.msgType) && msg->info.msgType != 0) {
2159
    msg->msgType = msg->info.msgType + 1;
2160
  }
2161
  if (msg->info.noResp) {
2162
    rpcFreeCont(msg->pCont);
2163
    return 0;
2164
  }
2165
  int32_t svrVer = 0;
2166
  code = taosVersionStrToInt(td_version, &svrVer);
2167
  msg->info.cliVer = svrVer;
2168
  msg->type = msg->info.connType;
2169
  return transSendResp(msg);
2170
}
2171
int32_t transRegisterMsg(const STransMsg *msg) {
2172
  rpcFreeCont(msg->pCont);
2173
  return 0;
2174
}
2175
int32_t transSetIpWhiteList(void *thandle, void *arg, FilteFunc *func) { return 0; }
2176

2177
void *transInitServer(SIpAddr *pAddr, char *label, int numOfThreads, void *fp, void *pInit) { return NULL; }
2178
void  transCloseServer(void *arg) {
2179
   // impl later
2180
  return;
2181
}
2182

2183
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc