• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.25
/source/libs/transport/src/transComm.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "transComm.h"
17
#include "osTime.h"
18
#include "tqueue.h"
19
#include "transLog.h"
20

21
#ifndef TD_ASTRA_RPC
22
#define BUFFER_CAP 8 * 1024
23

24
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
25

26
static int32_t refMgt;
27
static int32_t svrRefMgt;
28
static int32_t instMgt;
29
static int32_t transSyncMsgMgt;
30

31
void transDestroySyncMsg(void* msg);
32

33
int32_t transCompressMsg(char* msg, int32_t len) {
9,496✔
34
  int32_t        ret = 0;
9,496✔
35
  int            compHdr = sizeof(STransCompMsg);
9,496✔
36
  STransMsgHead* pHead = transHeadFromCont(msg);
9,496✔
37

38
  int64_t start = taosGetTimestampMs();
9,497✔
39
  char* buf = taosMemoryMalloc(len + compHdr + 8);  // 8 extra bytes
9,497!
40
  if (buf == NULL) {
9,494!
41
    tWarn("failed to allocate memory for rpc msg compression, contLen:%d", len);
×
42
    ret = len;
×
43
    return ret;
×
44
  }
45

46
  int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
9,494✔
47
  /*
48
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
49
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
50
   */
51
  if (clen > 0 && clen < len - compHdr) {
9,494!
52
    STransCompMsg* pComp = (STransCompMsg*)msg;
8,825✔
53
    pComp->reserved = 0;
8,825✔
54
    pComp->contLen = htonl(len);
8,825✔
55
    memcpy(msg + compHdr, buf, clen);
8,825✔
56

57
    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
8,825✔
58
    ret = clen + compHdr;
8,826✔
59
    pHead->comp = 1;
8,826✔
60
  } else {
61
    ret = len;
669✔
62
    pHead->comp = 0;
669✔
63
  }
64
  taosMemoryFree(buf);
9,495!
65

66
  int64_t elapse = taosGetTimestampMs() - start;
9,498✔
67
  if (elapse >= 100) {
9,498!
68
    tWarn("compress msg cost %dms", (int)(elapse));
×
69
  }
70
  return ret;
9,498✔
71
}
72
int32_t transDecompressMsg(char** msg, int32_t* len) {
86,958,573✔
73
  STransMsgHead* pHead = (STransMsgHead*)(*msg);
86,958,573✔
74
  if (pHead->comp == 0) return 0;
86,958,573!
75

76
  int64_t start = taosGetTimestampMs();
8,902✔
77

78
  char* pCont = transContFromHead(pHead);
8,902✔
79

80
  STransCompMsg* pComp = (STransCompMsg*)pCont;
8,902✔
81
  int32_t        oriLen = htonl(pComp->contLen);
8,902✔
82

83
  int32_t tlen = *len;
8,902✔
84
  char*   buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
8,902!
85
  if (buf == NULL) {
8,899!
86
    return terrno;
×
87
  }
88

89
  STransMsgHead* pNewHead = (STransMsgHead*)buf;
8,899✔
90
  int32_t        decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
8,899✔
91
                                                 tlen - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
8,899✔
92

93
  if (decompLen != oriLen) {
8,901!
94
    taosMemoryFree(buf);
×
95
    return TSDB_CODE_INVALID_MSG;
×
96
  }
97
  memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
8,901✔
98

99
  *len = oriLen + sizeof(STransMsgHead);
8,901✔
100
  pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));
8,901✔
101

102
  taosMemoryFree(pHead);
8,901!
103
  *msg = buf;
8,903✔
104

105
  int64_t elapse = taosGetTimestampMs() - start;
8,902✔
106
  if (elapse >= 100) {
8,902!
107
    tWarn("dcompress msg cost %dms", (int)(elapse));
×
108
  }
109
  return 0;
8,902✔
110
}
111
int32_t transDecompressMsgExt(char const* msg, int32_t len, char** out, int32_t* outLen) {
7✔
112
  STransMsgHead* pHead = (STransMsgHead*)msg;
7✔
113
  char*          pCont = transContFromHead(pHead);
7✔
114

115
  STransCompMsg* pComp = (STransCompMsg*)pCont;
7✔
116
  int32_t        oriLen = htonl(pComp->contLen);
7✔
117

118
  int32_t tlen = len;
7✔
119
  char*   buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
7!
120
  if (buf == NULL) {
7!
121
    return terrno;
×
122
  }
123
  int64_t start = taosGetTimestampMs();
7✔
124

125
  STransMsgHead* pNewHead = (STransMsgHead*)buf;
7✔
126
  int32_t        decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
7✔
127
                                                 tlen - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
7✔
128
  if (decompLen != oriLen) {
7!
129
    tError("msgLen:%d, originLen:%d, decompLen:%d", len, oriLen, decompLen);
×
130
    taosMemoryFree(buf);
×
131
    return TSDB_CODE_INVALID_MSG;
×
132
  }
133
  memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
7✔
134

135
  *out = buf;
7✔
136
  *outLen = oriLen + sizeof(STransMsgHead);
7✔
137
  pNewHead->msgLen = *outLen;
7✔
138
  pNewHead->comp = 0;
7✔
139

140
  int64_t elapse = taosGetTimestampMs() - start;
7✔
141
  if (elapse >= 100) {
7!
142
    tWarn("dcompress msg cost %dms", (int)(elapse));
×
143
  }
144
  return 0;
7✔
145
}
146

147
void transFreeMsg(void* msg) {
191,291,310✔
148
  if (msg == NULL) {
191,291,310✔
149
    return;
4,198,331✔
150
  }
151
  tTrace("cont:%p, rpc free", (char*)msg - TRANS_MSG_OVERHEAD);
187,092,979✔
152
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
187,092,993!
153
}
154
void transSockInfo2Str(struct sockaddr* sockname, char* dst) {
836,981✔
155
  char     buf[IP_RESERVE_CAP] = {0};
836,981✔
156
  uint16_t port = 0;
836,981✔
157
  if (sockname->sa_family == AF_INET) {
836,981✔
158
    struct sockaddr_in* addr = (struct sockaddr_in*)sockname;
836,972✔
159

160
    int r = uv_ip4_name(addr, (char*)buf, sizeof(buf));
836,972✔
161

162
    port = ntohs(addr->sin_port);
837,570✔
163
  } else if (sockname->sa_family == AF_INET6) {
9!
164
    struct sockaddr_in6* addr = (struct sockaddr_in6*)sockname;
×
165
    uv_ip6_name(addr, buf, sizeof(buf));
×
166
    port = ntohs(addr->sin6_port);
×
167
  }
168
  sprintf(dst, "%s:%d", buf, port);
837,579✔
169
}
837,579✔
170
int32_t transInitBuffer(SConnBuffer* buf) {
511,568✔
171
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
511,568!
172
  if (buf->buf == NULL) {
512,448!
173
    return terrno;
×
174
  }
175

176
  buf->cap = BUFFER_CAP;
512,448✔
177
  buf->left = -1;
512,448✔
178
  buf->len = 0;
512,448✔
179
  buf->total = 0;
512,448✔
180
  buf->invalid = 0;
512,448✔
181
  return 0;
512,448✔
182
}
183
void transDestroyBuffer(SConnBuffer* p) {
512,491✔
184
  taosMemoryFree(p->buf);
512,491!
185
  p->buf = NULL;
512,503✔
186
}
512,503✔
187

188
int32_t transClearBuffer(SConnBuffer* buf) {
×
189
  SConnBuffer* p = buf;
×
190
  if (p->cap > BUFFER_CAP) {
×
191
    p->cap = BUFFER_CAP;
×
192
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
×
193
    if (p->buf == NULL) {
×
194
      return terrno;
×
195
    }
196
  }
197
  p->left = -1;
×
198
  p->len = 0;
×
199
  p->total = 0;
×
200
  p->invalid = 0;
×
201
  return 0;
×
202
}
203

204
int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
86,913,958✔
205
  static const int HEADSIZE = sizeof(STransMsgHead);
206
  int32_t          code = 0;
86,913,958✔
207
  SConnBuffer*     p = connBuf;
86,913,958✔
208
  if (p->left != 0 || p->total <= 0) {
86,913,958!
209
    return TSDB_CODE_INVALID_MSG;
×
210
  }
211
  int total = p->total;
86,985,750✔
212
  if (total >= HEADSIZE && !p->invalid) {
86,985,750!
213
    *buf = taosMemoryCalloc(1, total);
87,041,484!
214
    if (*buf == NULL) {
87,001,907!
215
      return terrno;
×
216
    }
217
    memcpy(*buf, p->buf, total);
87,001,907✔
218
    if ((code = transResetBuffer(connBuf, resetBuf)) < 0) {
87,001,907!
219
      return code;
×
220
    }
221
  } else {
222
    total = -1;
×
223
    return TSDB_CODE_INVALID_MSG;
×
224
  }
225
  return total;
87,039,227✔
226
}
227

228
int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
86,999,791✔
229
  SConnBuffer* p = connBuf;
86,999,791✔
230
  if (p->total < p->len) {
86,999,791✔
231
    int left = p->len - p->total;
810,963✔
232
    memmove(p->buf, p->buf + p->total, left);
810,963✔
233
    p->left = -1;
810,963✔
234
    p->total = 0;
810,963✔
235
    p->len = left;
810,963✔
236
  } else if (p->total == p->len) {
86,188,828!
237
    p->left = -1;
86,272,302✔
238
    p->total = 0;
86,272,302✔
239
    p->len = 0;
86,272,302✔
240
    if (p->cap > BUFFER_CAP) {
86,272,302✔
241
      if (resetBuf) {
42,624,216!
242
        p->cap = BUFFER_CAP;
×
243
        p->buf = taosMemoryRealloc(p->buf, p->cap);
×
244
        if (p->buf == NULL) {
×
245
          return terrno;
×
246
        }
247
      }
248
    }
249
  } else {
250
    tError("failed to reset buffer, total:%d, len:%d since %s", p->total, p->len, tstrerror(TSDB_CODE_INVALID_MSG));
×
251
    return TSDB_CODE_INVALID_MSG;
×
252
  }
253
  return 0;
87,057,169✔
254
}
255
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
89,564,035✔
256
  /*
257
   * formate of data buffer:
258
   * |<--------------------------data from socket------------------------------->|
259
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
260
   * info--->|
261
   */
262
  SConnBuffer* p = connBuf;
89,564,035✔
263
  uvBuf->base = p->buf + p->len;
89,564,035✔
264
  if (p->left == -1) {
89,564,035✔
265
    uvBuf->len = p->cap - p->len;
87,575,394✔
266
  } else {
267
    if (p->left < p->cap - p->len) {
1,988,641✔
268
      uvBuf->len = p->left;
1,591,596✔
269
    } else {
270
      p->cap = p->left + p->len;
397,045✔
271
      p->buf = taosMemoryRealloc(p->buf, p->cap);
397,045!
272
      if (p->buf == NULL) {
473,718!
273
        uvBuf->base = NULL;
×
274
        uvBuf->len = 0;
×
275
        return terrno;
×
276
      }
277
      uvBuf->base = p->buf + p->len;
473,718✔
278
      uvBuf->len = p->left;
473,718✔
279
    }
280
  }
281
  return 0;
89,640,708✔
282
}
283
// check whether already read complete
284
bool transReadComplete(SConnBuffer* connBuf) {
175,092,921✔
285
  SConnBuffer* p = connBuf;
175,092,921✔
286
  if (p->len >= sizeof(STransMsgHead)) {
175,092,921✔
287
    if (p->left == -1) {
89,014,315✔
288
      STransMsgHead head;
289
      memcpy((char*)&head, connBuf->buf, sizeof(head));
87,013,651✔
290
      int32_t msgLen = (int32_t)htonl(head.msgLen);
87,013,651✔
291
      p->total = msgLen;
87,013,651✔
292
      p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)) || head.version != TRANS_VER;
87,013,651!
293
    }
294
    if (p->total >= p->len) {
89,014,315✔
295
      p->left = p->total - p->len;
88,355,285✔
296
    } else {
297
      p->left = 0;
659,030✔
298
    }
299
  }
300
  return (p->left == 0 || p->invalid) ? true : false;
175,092,921!
301
}
302

303
int32_t transSetConnOption(uv_tcp_t* stream, int keepalive) {
512,007✔
304
#if defined(WINDOWS) || defined(DARWIN)
305
#else
306
  return uv_tcp_keepalive(stream, 1, keepalive);
512,007✔
307
#endif
308
  return uv_tcp_nodelay(stream, 1);
309
  // int ret = uv_tcp_keepalive(stream, 5, 60);
310
}
311

312
int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool) {
498,365✔
313
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
498,365!
314
  if (pool == NULL) {
498,365!
315
    return terrno;
×
316
    // return NULL;
317
  }
318
  int32_t code = 0;
498,365✔
319

320
  pool->nAsync = sz;
498,365✔
321
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
498,365!
322
  if (pool->asyncs == NULL) {
498,365!
323
    taosMemoryFree(pool);
×
324
    return terrno;
×
325
  }
326

327
  int i = 0, err = 0;
498,365✔
328
  for (i = 0; i < pool->nAsync; i++) {
1,867,131✔
329
    uv_async_t* async = &(pool->asyncs[i]);
1,368,766✔
330

331
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
1,368,766!
332
    if (item == NULL) {
1,368,766!
333
      code = terrno;
×
334
      break;
×
335
    }
336
    item->pThrd = arg;
1,368,766✔
337
    QUEUE_INIT(&item->qmsg);
1,368,766✔
338
    code = taosThreadMutexInit(&item->mtx, NULL);
1,368,766✔
339
    if (code) {
1,368,766!
340
      taosMemoryFree(item);
×
341
      break;
×
342
    }
343

344
    async->data = item;
1,368,766✔
345
    err = uv_async_init(loop, async, cb);
1,368,766✔
346
    if (err != 0) {
1,368,766!
347
      tError("failed to init async since %s", uv_err_name(err));
×
348
      code = TSDB_CODE_THIRDPARTY_ERROR;
×
349
      break;
×
350
    }
351
  }
352

353
  if (i != pool->nAsync) {
498,365!
354
    transAsyncPoolDestroy(pool);
×
355
    pool = NULL;
×
356
  }
357

358
  *pPool = pool;
498,365✔
359
  return 0;
498,365✔
360
  // return pool;
361
}
362

363
void transAsyncPoolDestroy(SAsyncPool* pool) {
498,364✔
364
  if (pool == NULL) return;
498,364!
365

366
  for (int i = 0; i < pool->nAsync; i++) {
1,867,129✔
367
    uv_async_t* async = &(pool->asyncs[i]);
1,368,765✔
368
    SAsyncItem* item = async->data;
1,368,765✔
369
    if (item == NULL) continue;
1,368,765!
370

371
    TAOS_UNUSED(taosThreadMutexDestroy(&item->mtx));
1,368,765✔
372
    taosMemoryFree(item);
1,368,765!
373
  }
374
  taosMemoryFree(pool->asyncs);
498,364!
375
  taosMemoryFree(pool);
498,364!
376
}
377
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
436,345✔
378
  for (int i = 0; i < pool->nAsync; i++) {
1,309,035✔
379
    uv_async_t* async = &(pool->asyncs[i]);
872,690✔
380
    SAsyncItem* item = async->data;
872,690✔
381
    if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
872,690!
382
  }
383
  return true;
436,345✔
384
}
385
int transAsyncSend(SAsyncPool* pool, queue* q) {
95,502,481✔
386
  if (atomic_load_8(&pool->stop) == 1) {
95,502,481!
387
    return TSDB_CODE_RPC_ASYNC_MODULE_QUIT;
×
388
  }
389
  int idx = pool->index % pool->nAsync;
95,466,864✔
390

391
  // no need mutex here
392
  if (pool->index++ > pool->nAsync * 2000) {
95,466,864✔
393
    pool->index = 0;
8,424✔
394
  }
395
  uv_async_t* async = &(pool->asyncs[idx]);
95,466,864✔
396
  SAsyncItem* item = async->data;
95,466,864✔
397

398
  if (taosThreadMutexLock(&item->mtx) != 0) {
95,466,864!
399
    tError("failed to lock mutex since %s", tstrerror(terrno));
×
400
    return terrno;
×
401
  }
402

403
  QUEUE_PUSH(&item->qmsg, q);
95,576,056✔
404
  TAOS_UNUSED(taosThreadMutexUnlock(&item->mtx));
95,576,056✔
405

406
  int ret = uv_async_send(async);
95,589,812✔
407
  if (ret != 0) {
95,549,221!
408
    tError("failed to send async since %s", uv_err_name(ret));
×
409
    return TSDB_CODE_THIRDPARTY_ERROR;
×
410
  }
411
  return 0;
95,549,221✔
412
}
413

414
void transCtxInit(STransCtx* ctx) {
×
415
  // init transCtx
416
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
417
  ctx->brokenVal.val = NULL;
×
418
  ctx->freeFunc = NULL;
×
419
}
×
420
void transCtxCleanup(STransCtx* ctx) {
8,721,874✔
421
  if (ctx == NULL || ctx->args == NULL) {
8,721,874!
422
    return;
5,650,138✔
423
  }
424

425
  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
3,071,736✔
426
  while (iter) {
6,144,236✔
427
    int32_t* type = taosHashGetKey(iter, NULL);
3,072,135✔
428
    tDebug("free msg type %s dump func", TMSG_INFO(*type));
3,071,232!
429
    ctx->freeFunc(iter->val);
3,071,214✔
430
    iter = taosHashIterate(ctx->args, iter);
3,071,900✔
431
  }
432
  if (ctx->freeFunc) ctx->freeFunc(ctx->brokenVal.val);
3,072,101✔
433
  taosHashCleanup(ctx->args);
3,072,087✔
434
  ctx->args = NULL;
3,071,934✔
435
}
436

437
void transCtxMerge(STransCtx* dst, STransCtx* src) {
2,799,677✔
438
  if (src->args == NULL || src->freeFunc == NULL) {
2,799,677!
439
    return;
2,799,660✔
440
  }
441
  SRpcBrokenlinkVal tval = {0};
18✔
442
  void (*freeFunc)(const void* arg) = NULL;
18✔
443

444
  if (dst->args == NULL) {
18✔
445
    dst->args = src->args;
1✔
446
    dst->brokenVal = src->brokenVal;
1✔
447
    dst->freeFunc = src->freeFunc;
1✔
448
    src->args = NULL;
1✔
449
    return;
1✔
450
  } else {
451
    tval = dst->brokenVal;
17✔
452
    freeFunc = dst->freeFunc;
17✔
453

454
    dst->brokenVal = src->brokenVal;
17✔
455
    dst->freeFunc = src->freeFunc;
17✔
456
  }
457

458
  size_t klen = 0;
17✔
459
  void*  iter = taosHashIterate(src->args, NULL);
17✔
460
  while (iter) {
34✔
461
    STransCtxVal* sVal = (STransCtxVal*)iter;
17✔
462
    int32_t*      msgType = taosHashGetKey(sVal, &klen);
17✔
463

464
    STransCtxVal* dVal = taosHashGet(dst->args, msgType, sizeof(*msgType));
17✔
465
    if (dVal != NULL) {
17!
466
      tDebug("free msg type %s dump func", TMSG_INFO(*(int32_t*)msgType));
17!
467
      dst->freeFunc(dVal->val);
17✔
468
      dVal->val = NULL;
17✔
469

470
      TAOS_UNUSED(taosHashRemove(dst->args, msgType, sizeof(*msgType)));
17✔
471
    }
472

473
    int32_t code = taosHashPut(dst->args, msgType, sizeof(*msgType), sVal, sizeof(*sVal));
17✔
474
    if (code != 0) {
17!
475
      tError("failed to put val to hash since %s", tstrerror(code));
×
476
      tDebug("put msg type %s dump func", TMSG_INFO(*(int32_t*)msgType));
×
477
      if (src->freeFunc) (src->freeFunc)(sVal->val);
×
478
      sVal->val = NULL;
×
479
    }
480
    iter = taosHashIterate(src->args, iter);
17✔
481
  }
482
  if (freeFunc != NULL && tval.val != NULL) {
17!
483
    freeFunc(tval.val);
17✔
484
    tval.val = NULL;
17✔
485
  }
486

487
  taosHashCleanup(src->args);
17✔
488
}
489
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
58,939✔
490
  if (ctx->args == NULL) {
58,939!
491
    return NULL;
×
492
  }
493
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
58,939✔
494
  if (cVal == NULL) {
59,073!
495
    return NULL;
×
496
  }
497
  void* ret = NULL;
59,073✔
498
  TAOS_UNUSED((*cVal->clone)(cVal->val, &ret));
59,073✔
499
  return ret;
58,951✔
500
}
501
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
×
502
  void* ret = NULL;
×
503
  if (ctx->brokenVal.clone == NULL) {
×
504
    return ret;
×
505
  }
506
  TAOS_UNUSED((*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret));
×
507

508
  *msgType = ctx->brokenVal.msgType;
×
509

510
  return ret;
×
511
}
512

513
#if 0
514
int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
515
  QUEUE_INIT(&wq->node);
516
  wq->freeFunc = (void (*)(void*))freeFunc;
517
  wq->size = 0;
518
  wq->inited = 1;
519
  return 0;
520
}
521
void transQueuePush(STransQueue* q, void* arg) {
522
  queue* node = arg;
523
  QUEUE_PUSH(&q->node, node);
524
  q->size++;
525
}
526
void* transQueuePop(STransQueue* q) {
527
  if (q->size == 0) return NULL;
528

529
  queue* head = QUEUE_HEAD(&q->node);
530
  QUEUE_REMOVE(head);
531
  q->size--;
532
  return head;
533
}
534
int32_t transQueueSize(STransQueue* q) { return q->size; }
535

536
void* transQueueGet(STransQueue* q, int idx) {
537
  if (q->size == 0) return NULL;
538

539
  while (idx-- > 0) {
540
    queue* node = QUEUE_NEXT(&q->node);
541
    if (node == &q->node) return NULL;
542
  }
543
  return NULL;
544
}
545

546
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size)
547
{
548
  queue* d = dst;
549
  queue* node = QUEUE_NEXT(&q->node);
550
  while (node != &q->node) {
551
    queue* next = QUEUE_NEXT(node);
552
    if (filter && filter(node, arg)) {
553
      QUEUE_REMOVE(node);
554
      q->size--;
555
      QUEUE_PUSH(d, node);
556
      if (--size == 0) {
557
        break;
558
      }
559
    }
560
    node = next;
561
  }
562
}
563

564
void* tranQueueHead(STransQueue* q) {
565
  if (q->size == 0) return NULL;
566

567
  queue* head = QUEUE_HEAD(&q->node);
568
  return head;
569
}
570

571
void* transQueueRm(STransQueue* q, int i) {
572
  // if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
573
  //   return NULL;
574
  // }
575
  // if (i >= taosArrayGetSize(queue->q)) {
576
  //   return NULL;
577
  // }
578
  // void* ptr = taosArrayGetP(queue->q, i);
579
  // taosArrayRemove(queue->q, i);
580
  // return ptr;
581
  return NULL;
582
}
583

584
void transQueueRemove(STransQueue* q, void* e) {
585
  if (q->size == 0) return;
586
  queue* node = e;
587
  QUEUE_REMOVE(node);
588
  q->size--;
589
}
590

591
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
592

593
void transQueueClear(STransQueue* q) {
594
  if (q->inited == 0) return;
595
  while (!QUEUE_IS_EMPTY(&q->node)) {
596
    queue* h = QUEUE_HEAD(&q->node);
597
    QUEUE_REMOVE(h);
598
    if (q->freeFunc != NULL) (q->freeFunc)(h);
599
    q->size--;
600
  }
601
}
602
void transQueueDestroy(STransQueue* q) { transQueueClear(q); }
603
#endif
604

605
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
29,241✔
606
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
29,241✔
607
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
29,241✔
608
  if (arg1->execTime > arg2->execTime) {
29,241✔
609
    return 0;
14,576✔
610
  } else {
611
    return 1;
14,665✔
612
  }
613
}
614

615
static void transDQTimeout(uv_timer_t* timer) {
181,022✔
616
  SDelayQueue* queue = timer->data;
181,022✔
617
  tTrace("timer %p timeout", timer);
181,022✔
618
  uint64_t timeout = 0;
181,022✔
619
  int64_t  current = taosGetTimestampMs();
181,033✔
620
  do {
171,710✔
621
    HeapNode* minNode = heapMin(queue->heap);
352,743✔
622
    if (minNode == NULL) break;
352,736✔
623
    SDelayTask* task = container_of(minNode, SDelayTask, node);
199,495✔
624
    if (task->execTime <= current) {
199,495✔
625
      heapRemove(queue->heap, minNode);
171,722✔
626
      task->func(task->arg);
171,684✔
627
      taosMemoryFree(task);
171,675!
628
      timeout = 0;
171,710✔
629
    } else {
630
      timeout = task->execTime - current;
27,773✔
631
      break;
27,773✔
632
    }
633
  } while (1);
634
  if (timeout != 0) {
181,014✔
635
    TAOS_UNUSED(uv_timer_start(queue->timer, transDQTimeout, timeout, 0));
27,809✔
636
  }
637
}
181,013✔
638
int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
872,690✔
639
  int32_t      code = 0;
872,690✔
640
  Heap*        heap = NULL;
872,690✔
641
  uv_timer_t*  timer = NULL;
872,690✔
642
  SDelayQueue* q = NULL;
872,690✔
643

644
  timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
872,690!
645
  if (timer == NULL) {
872,690!
646
    return terrno;
×
647
  }
648

649
  heap = heapCreate(timeCompare);
872,690✔
650
  if (heap == NULL) {
872,690!
651
    TAOS_CHECK_GOTO(terrno, NULL, _return1);
×
652
  }
653

654
  q = taosMemoryCalloc(1, sizeof(SDelayQueue));
872,690!
655
  if (q == NULL) {
872,690!
656
    TAOS_CHECK_GOTO(terrno, NULL, _return1);
×
657
  }
658
  q->heap = heap;
872,690✔
659
  q->timer = timer;
872,690✔
660
  q->loop = loop;
872,690✔
661
  q->timer->data = q;
872,690✔
662

663
  int err = uv_timer_init(loop, timer);
872,690✔
664
  if (err != 0) {
872,690!
665
    TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _return1);
×
666
  }
667

668
  *queue = q;
872,690✔
669
  return 0;
872,690✔
670

671
_return1:
×
672
  taosMemoryFree(timer);
×
673
  heapDestroy(heap);
×
674
  taosMemoryFree(q);
×
675
  return TSDB_CODE_OUT_OF_MEMORY;
×
676
}
677

678
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
872,690✔
679
  taosMemoryFree(queue->timer);
872,690!
680

681
  while (heapSize(queue->heap) > 0) {
874,925✔
682
    HeapNode* minNode = heapMin(queue->heap);
2,235✔
683
    if (minNode == NULL) {
2,235!
684
      return;
×
685
    }
686
    heapRemove(queue->heap, minNode);
2,235✔
687

688
    SDelayTask* task = container_of(minNode, SDelayTask, node);
2,235✔
689

690
    STaskArg* arg = task->arg;
2,235✔
691
    if (freeFunc) freeFunc(arg);
2,235!
692
    taosMemoryFree(arg);
2,235!
693

694
    taosMemoryFree(task);
2,235!
695
  }
696
  heapDestroy(queue->heap);
872,690✔
697
  taosMemoryFree(queue);
872,690!
698
}
699
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
26,234✔
700
  TAOS_UNUSED(uv_timer_stop(queue->timer));
26,234✔
701

702
  if (heapSize(queue->heap) <= 0) {
26,232!
703
    taosMemoryFree(task->arg);
×
704
    taosMemoryFree(task);
×
705
    return;
×
706
  }
707
  heapRemove(queue->heap, &task->node);
26,233✔
708

709
  taosMemoryFree(task->arg);
26,236!
710
  taosMemoryFree(task);
26,236!
711

712
  if (heapSize(queue->heap) != 0) {
26,237✔
713
    HeapNode* minNode = heapMin(queue->heap);
4,725✔
714
    if (minNode == NULL) return;
4,723!
715

716
    uint64_t    now = taosGetTimestampMs();
4,725✔
717
    SDelayTask* task = container_of(minNode, SDelayTask, node);
4,725✔
718
    uint64_t    timeout = now > task->execTime ? now - task->execTime : 0;
4,725!
719

720
    TAOS_UNUSED(uv_timer_start(queue->timer, transDQTimeout, timeout, 0));
4,725✔
721
  }
722
}
723

724
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
200,190✔
725
  uint64_t    now = taosGetTimestampMs();
200,192✔
726
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
200,192!
727
  if (task == NULL) {
200,189!
728
    return NULL;
×
729
  }
730

731
  task->func = func;
200,189✔
732
  task->arg = arg;
200,189✔
733
  task->execTime = now + timeoutMs;
200,189✔
734

735
  HeapNode* minNode = heapMin(queue->heap);
200,189✔
736
  if (minNode) {
200,192✔
737
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
23,185✔
738
    if (minTask->execTime < task->execTime) {
23,185✔
739
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
12,260✔
740
    }
741
  }
742

743
  tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
200,192✔
744
  heapInsert(queue->heap, &task->node);
200,192✔
745
  TAOS_UNUSED(uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0));
200,189✔
746
  return task;
200,189✔
747
}
748

749
#if 0
750
void transPrintEpSet(SEpSet* pEpSet) {
751
  if (pEpSet == NULL) {
752
    tTrace("NULL epset");
753
    return;
754
  }
755
  char buf[512] = {0};
756
  int  len = tsnprintf(buf, sizeof(buf), "epset:{");
757
  for (int i = 0; i < pEpSet->numOfEps; i++) {
758
    if (i == pEpSet->numOfEps - 1) {
759
      len += tsnprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
760
    } else {
761
      len += tsnprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
762
    }
763
  }
764
  len += tsnprintf(buf + len, sizeof(buf) - len, "}");
765
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
766
}
767
bool transReqEpsetIsEqual(SReqEpSet* a, SReqEpSet* b) {
768
  if (a == NULL && b == NULL) {
769
    return true;
770
  } else if (a == NULL || b == NULL) {
771
    return false;
772
  }
773

774
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
775
    return false;
776
  }
777
  for (int i = 0; i < a->numOfEps; i++) {
778
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
779
      return false;
780
    }
781
  }
782
  return true;
783
}
784
bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b) {
785
  if (a->numOfEps != b->numOfEps) {
786
    return false;
787
  }
788
  for (int i = 0; i < a->numOfEps; i++) {
789
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
790
      return false;
791
    }
792
  }
793
  return true;
794
}
795
#endif
796

797
static void transInitEnv() {
24,489✔
798
  refMgt = transOpenRefMgt(50000, transDestroyExHandle);
24,489✔
799
  svrRefMgt = transOpenRefMgt(50000, transDestroyExHandle);
24,489✔
800
  instMgt = taosOpenRef(50, rpcCloseImpl);
24,489✔
801
  transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg);
24,489✔
802
  TAOS_UNUSED(uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"));
24,489✔
803
}
24,489✔
804
static void transDestroyEnv() {
21,214✔
805
  transCloseRefMgt(refMgt);
21,214✔
806
  transCloseRefMgt(svrRefMgt);
21,214✔
807
  transCloseRefMgt(instMgt);
21,214✔
808
  transCloseRefMgt(transSyncMsgMgt);
21,214✔
809
}
21,214✔
810

811
int32_t transInit() {
52,219✔
812
  // init env
813
  int32_t code = taosThreadOnce(&transModuleInit, transInitEnv);
52,219✔
814
  if (code != 0) {
52,219!
815
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
816
  }
817
  return code;
52,219✔
818
}
819

820
int32_t transGetRefMgt() { return refMgt; }
115,673,376✔
821
int32_t transGetSvrRefMgt() { return svrRefMgt; }
×
822
int32_t transGetInstMgt() { return instMgt; }
69,569,852✔
823
int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; }
842,497✔
824

825
void transCleanup() {
21,214✔
826
  // clean env
827
  transDestroyEnv();
21,214✔
828
}
21,214✔
829
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
110,986✔
830
  // added into once later
831
  return taosOpenRef(size, func);
110,986✔
832
}
833
void transCloseRefMgt(int32_t mgt) {
84,856✔
834
  // close ref
835
  taosCloseRef(mgt);
84,856✔
836
}
84,856✔
837
int64_t transAddExHandle(int32_t refMgt, void* p) {
8,973,907✔
838
  // acquire extern handle
839
  return taosAddRef(refMgt, p);
8,973,907✔
840
}
841
void transRemoveExHandle(int32_t refMgt, int64_t refId) {
8,968,157✔
842
  // acquire extern handle
843
  int32_t code = taosRemoveRef(refMgt, refId);
8,968,157✔
844
  if (code != 0) {
8,973,279✔
845
    tTrace("failed to remove %" PRId64 " from resetId:%d", refId, refMgt);
1!
846
  }
847
}
8,973,279✔
848

849
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {  // acquire extern handle
255,808,622✔
850
  return (void*)taosAcquireRef(refMgt, refId);
255,808,622✔
851
}
852

853
void transReleaseExHandle(int32_t refMgt, int64_t refId) {
254,724,218✔
854
  // release extern handle
855
  int32_t code = taosReleaseRef(refMgt, refId);
254,724,218✔
856
  if (code != 0) {
254,946,055✔
857
    tTrace("failed to release %" PRId64 " from resetId:%d", refId, refMgt);
15,767✔
858
  }
859
}
254,946,055✔
860
void transDestroyExHandle(void* handle) {
8,932,838✔
861
  if (handle == NULL) {
8,932,838!
862
    return;
×
863
  }
864
  SExHandle* eh = handle;
8,932,838✔
865
  tDebug("trans destroy sid:%" PRId64 ", memory %p", eh->refId, handle);
8,932,838✔
866
  taosMemoryFree(handle);
8,932,871!
867
}
868

869
void transDestroySyncMsg(void* msg) {
140,572✔
870
  if (msg == NULL) return;
140,572!
871

872
  STransSyncMsg* pSyncMsg = msg;
140,572✔
873
  TAOS_UNUSED(tsem2_destroy(pSyncMsg->pSem));
140,572✔
874
  taosMemoryFree(pSyncMsg->pSem);
140,569!
875
  transFreeMsg(pSyncMsg->pRsp->pCont);
140,570✔
876
  taosMemoryFree(pSyncMsg->pRsp);
140,575!
877
  taosMemoryFree(pSyncMsg);
140,581!
878
}
879

880
uint32_t subnetIpRang2Int(SIpV4Range* pRange) {
×
881
  uint32_t ip = pRange->ip;
×
882
  return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF);
×
883
}
884
int32_t subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange) {
×
885
  if (pRange->mask == 32) {
×
886
    pUtils->type = 0;
×
887
    pUtils->address = pRange->ip;
×
888
    return 0;
×
889
  }
890
  pUtils->address = subnetIpRang2Int(pRange);
×
891

892
  for (int i = 0; i < pRange->mask; i++) {
×
893
    pUtils->netmask |= (1 << (31 - i));
×
894
  }
895

896
  pUtils->network = pUtils->address & pUtils->netmask;
×
897
  pUtils->broadcast = (pUtils->network) | (pUtils->netmask ^ 0xFFFFFFFF);
×
898
  pUtils->type = (pRange->mask == 32 ? 0 : 1);
×
899

900
  return 0;
×
901
}
902
int32_t subnetDebugInfoToBuf(SubnetUtils* pUtils, char* buf) {
×
903
  sprintf(buf, "raw:%s, address:%d, netmask:%d, network:%d, broadcast:%d", pUtils->info, pUtils->address,
×
904
          pUtils->netmask, pUtils->network, pUtils->broadcast);
905
  return 0;
×
906
}
907
int32_t subnetCheckIp(SubnetUtils* pUtils, uint32_t ip) {
×
908
  // impl later
909
  if (pUtils == NULL) return false;
×
910
  if (pUtils->type == 0) {
×
911
    return pUtils->address == ip;
×
912
  } else {
913
    SIpV4Range range = {.ip = ip, .mask = 32};
×
914

915
    uint32_t t = subnetIpRang2Int(&range);
×
916
    return t >= pUtils->network && t <= pUtils->broadcast;
×
917
  }
918
}
919

920
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) {
×
921
  int32_t len = 0;
×
922

923
  struct in_addr addr;
924
  addr.s_addr = pRange->ip;
×
925

926
  int32_t err = uv_inet_ntop(AF_INET, &addr, buf, 32);
×
927
  if (err != 0) {
×
928
    tError("failed to convert ip to string since %s", uv_strerror(err));
×
929
    return TSDB_CODE_THIRDPARTY_ERROR;
×
930
  }
931

932
  len = strlen(buf);
×
933

934
  if (pRange->mask != 32) {
×
935
    len += sprintf(buf + len, "/%d", pRange->mask);
×
936
  }
937
  buf[len] = 0;
×
938
  return len;
×
939
}
940

941
int32_t transUtilSWhiteListToStr(SIpWhiteListDual* pList, char** ppBuf) {
120✔
942
  int32_t code = 0;
120✔
943
  int32_t lino = 0;
120✔
944
  char*   pBuf = NULL;
120✔
945
  int32_t len = 0;
120✔
946
  if (pList->num == 0) {
120!
947
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _error);
×
948
  }
949

950
  pBuf = taosMemoryCalloc(1, pList->num * IP_RESERVE_CAP);
120!
951
  if (pBuf == NULL) {
118!
952
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
953
  }
954

955
  for (int i = 0; i < pList->num; i++) {
474✔
956
    SIpRange* pRange = &pList->pIpRanges[i];
357✔
957
    SIpAddr   addr = {0};
357✔
958
    code = tIpUintToStr(pRange, &addr);
357✔
959
    TSDB_CHECK_CODE(code, lino, _error);
356!
960

961
    len += sprintf(pBuf + len, "%s,", IP_ADDR_STR(&addr));
356✔
962
  }
963
  if (len > 0) {
117!
964
    pBuf[len - 1] = 0;
120✔
965
  }
966

967
  *ppBuf = pBuf;
117✔
968
_error:
117✔
969
  if (code != 0) {
117!
970
    taosMemoryFree(pBuf);
×
971
    *ppBuf = NULL;
×
972
  }
973

974
  return len;
117✔
975
}
976

977
bool transUtilCheckDualIp(SIpRange* range, SIpRange* ip) {
×
978
  SIpV6Range* p6 = &range->ipV6;
×
979
  SIpV6Range* pIp = &ip->ipV6;
×
980

981
  if (p6->mask == 0) {
×
982
    return true;
×
983
  } else if (p6->mask == 128) {
×
984
    return p6->addr[0] == pIp->addr[0] && p6->addr[1] == pIp->addr[1];
×
985
  }
986

987
  uint64_t maskHigh = 0, maskLow = 0;
×
988
  if (p6->mask <= 64) {
×
989
    maskHigh = (0xFFFFFFFFFFFFFFFFULL << (64 - p6->mask));
×
990
    maskLow = 0;
×
991
  } else {
992
    maskHigh = 0xFFFFFFFFFFFFFFFFULL;
×
993
    maskLow = (0xFFFFFFFFFFFFFFFFULL << (128 - p6->mask));
×
994
  }
995

996
  return ((pIp->addr[0] & maskHigh) == (p6->addr[0] & maskHigh)) &&
×
997
         ((pIp->addr[1] & maskLow) == (p6->addr[1] & maskLow));
×
998
}
999

1000
int32_t initWQ(queue* wq) {
511,631✔
1001
  int32_t code = 0;
511,631✔
1002
  QUEUE_INIT(wq);
511,631✔
1003
  for (int i = 0; i < 4; i++) {
2,561,105✔
1004
    SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
2,048,573!
1005
    if (w == NULL) {
2,049,474!
1006
      TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1007
    }
1008
    w->wreq.data = w;
2,049,474✔
1009
    w->arg = NULL;
2,049,474✔
1010
    QUEUE_INIT(&w->node);
2,049,474✔
1011
    QUEUE_PUSH(wq, &w->q);
2,049,474✔
1012
  }
1013
  return 0;
512,532✔
1014
_exception:
×
1015
  destroyWQ(wq);
×
1016
  return code;
×
1017
}
1018
void destroyWQ(queue* wq) {
512,500✔
1019
  while (!QUEUE_IS_EMPTY(wq)) {
2,563,128✔
1020
    queue* h = QUEUE_HEAD(wq);
2,050,626✔
1021
    QUEUE_REMOVE(h);
2,050,626✔
1022
    SWReqsWrapper* w = QUEUE_DATA(h, SWReqsWrapper, q);
2,050,626✔
1023
    taosMemoryFree(w);
2,050,626!
1024
  }
1025
}
512,502✔
1026

1027
uv_write_t* allocWReqFromWQ(queue* wq, void* arg) {
89,029,803✔
1028
  if (!QUEUE_IS_EMPTY(wq)) {
89,029,803!
1029
    queue* node = QUEUE_HEAD(wq);
89,064,116✔
1030
    QUEUE_REMOVE(node);
89,064,116✔
1031
    SWReqsWrapper* w = QUEUE_DATA(node, SWReqsWrapper, q);
89,064,116✔
1032
    w->arg = arg;
89,064,116✔
1033
    QUEUE_INIT(&w->node);
89,064,116✔
1034

1035
    return &w->wreq;
89,064,116✔
1036
  } else {
1037
    SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
×
1038
    if (w == NULL) {
769!
1039
      return NULL;
×
1040
    }
1041
    w->wreq.data = w;
769✔
1042
    w->arg = arg;
769✔
1043
    QUEUE_INIT(&w->node);
769✔
1044
    return &w->wreq;
769✔
1045
  }
1046
}
1047

1048
void freeWReqToWQ(queue* wq, SWReqsWrapper* w) {
89,059,437✔
1049
  QUEUE_INIT(&w->node);
89,059,437✔
1050
  QUEUE_PUSH(wq, &w->q);
89,059,437✔
1051
}
89,059,437✔
1052

1053
int32_t transSetReadOption(uv_handle_t* handle) {
89,491,679✔
1054
  int32_t code = 0;
89,491,679✔
1055
  int32_t fd;
1056
  int     ret = uv_fileno((uv_handle_t*)handle, &fd);
89,491,679✔
1057
  if (ret != 0) {
89,526,491!
1058
    tWarn("failed to get fd since %s", uv_err_name(ret));
×
1059
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1060
  }
1061
  code = taosSetSockOpt2(fd);
89,526,491✔
1062
  return code;
89,472,185✔
1063
}
1064

1065
int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet) {
58,093,095✔
1066
  if (pEpset == NULL) {
58,093,095!
1067
    return TSDB_CODE_INVALID_PARA;
×
1068
  }
1069

1070
  if (pReqEpSet == NULL) {
58,093,095!
1071
    return TSDB_CODE_INVALID_PARA;
×
1072
  }
1073

1074
  int32_t    size = sizeof(SReqEpSet) + sizeof(SEp) * pEpset->numOfEps;
58,093,095✔
1075
  SReqEpSet* pReq = (SReqEpSet*)taosMemoryCalloc(1, size);
58,093,095!
1076
  if (pReq == NULL) {
58,103,470!
1077
    return TSDB_CODE_OUT_OF_MEMORY;
×
1078
  }
1079
  memcpy((char*)pReq, (char*)pEpset, size);
58,103,470✔
1080
  // clear previous
1081
  taosMemoryFree(*pReqEpSet);
58,103,470!
1082

1083
  if (transValidReqEpset(pReq) != TSDB_CODE_SUCCESS) {
58,134,928!
1084
    taosMemoryFree(pReq);
×
1085
    return TSDB_CODE_INVALID_PARA;
×
1086
  }
1087

1088
  *pReqEpSet = pReq;
58,133,999✔
1089
  return TSDB_CODE_SUCCESS;
58,133,999✔
1090
}
1091

1092
int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet) {
1,458,648✔
1093
  if (pReqEpSet == NULL) {
1,458,648!
1094
    return TSDB_CODE_INVALID_PARA;
×
1095
  }
1096
  memcpy((char*)pEpSet, (char*)pReqEpSet, sizeof(SReqEpSet) + sizeof(SEp) * pReqEpSet->numOfEps);
1,458,648✔
1097
  return TSDB_CODE_SUCCESS;
1,458,648✔
1098
}
1099

1100
int32_t transValidReqEpset(SReqEpSet* pReqEpSet) {
58,096,389✔
1101
  if (pReqEpSet == NULL) {
58,096,389!
1102
    return TSDB_CODE_INVALID_PARA;
×
1103
  }
1104
  if (pReqEpSet->numOfEps == 0 || pReqEpSet->numOfEps > TSDB_MAX_EP_NUM || pReqEpSet->inUse >= TSDB_MAX_EP_NUM) {
58,096,389!
1105
    return TSDB_CODE_INVALID_PARA;
×
1106
  }
1107
  return TSDB_CODE_SUCCESS;
58,145,051✔
1108
}
1109

1110
#else
1111
#define BUFFER_CAP 4096
1112

1113
typedef struct {
1114
  int32_t      numOfThread;
1115
  STaosQueue** qhandle;
1116
  STaosQset**  qset;
1117
  int64_t      idx;
1118

1119
} MultiThreadQhandle;
1120
typedef struct TThread {
1121
  TdThread thread;
1122
  int      idx;
1123
} TThread;
1124

1125
TdThreadMutex       mutex[2];
1126
MultiThreadQhandle* multiQ[2] = {NULL, NULL};
1127
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
1128

1129
static int32_t refMgt;
1130
static int32_t svrRefMgt;
1131
static int32_t instMgt;
1132
static int32_t transSyncMsgMgt;
1133
TdThreadMutex  mutex[2];
1134

1135
TdThreadMutex tableMutex;
1136
SHashObj*     hashTable = NULL;
1137

1138
void transDestroySyncMsg(void* msg);
1139

1140
int32_t transCompressMsg(char* msg, int32_t len) {
1141
  int32_t        ret = 0;
1142
  int            compHdr = sizeof(STransCompMsg);
1143
  STransMsgHead* pHead = transHeadFromCont(msg);
1144

1145
  char* buf = taosMemoryMalloc(len + compHdr + 8);  // 8 extra bytes
1146
  if (buf == NULL) {
1147
    tWarn("failed to allocate memory for rpc msg compression, contLen:%d", len);
1148
    ret = len;
1149
    return ret;
1150
  }
1151

1152
  int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
1153
  /*
1154
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
1155
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
1156
   */
1157
  if (clen > 0 && clen < len - compHdr) {
1158
    STransCompMsg* pComp = (STransCompMsg*)msg;
1159
    pComp->reserved = 0;
1160
    pComp->contLen = htonl(len);
1161
    memcpy(msg + compHdr, buf, clen);
1162

1163
    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
1164
    ret = clen + compHdr;
1165
    pHead->comp = 1;
1166
  } else {
1167
    ret = len;
1168
    pHead->comp = 0;
1169
  }
1170
  taosMemoryFree(buf);
1171
  return ret;
1172
}
1173
int32_t transDecompressMsg(char** msg, int32_t* len) { return 0; }
1174

1175
void transFreeMsg(void* msg) {
1176
  if (msg == NULL) {
1177
    return;
1178
  }
1179
  tTrace("rpc free cont:%p", (char*)msg - TRANS_MSG_OVERHEAD);
1180
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
1181
}
1182

1183
void transCtxInit(STransCtx* ctx) {
1184
  // init transCtx
1185
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
1186
}
1187
void transCtxCleanup(STransCtx* ctx) {
1188
  if (ctx == NULL || ctx->args == NULL) {
1189
    return;
1190
  }
1191

1192
  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
1193
  while (iter) {
1194
    ctx->freeFunc(iter->val);
1195
    iter = taosHashIterate(ctx->args, iter);
1196
  }
1197
  if (ctx->freeFunc) ctx->freeFunc(ctx->brokenVal.val);
1198
  taosHashCleanup(ctx->args);
1199
  ctx->args = NULL;
1200
}
1201

1202
void transCtxMerge(STransCtx* dst, STransCtx* src) {
1203
  if (src->args == NULL || src->freeFunc == NULL) {
1204
    return;
1205
  }
1206
  if (dst->args == NULL) {
1207
    dst->args = src->args;
1208
    dst->brokenVal = src->brokenVal;
1209
    dst->freeFunc = src->freeFunc;
1210
    src->args = NULL;
1211
    return;
1212
  }
1213
  void*  key = NULL;
1214
  size_t klen = 0;
1215
  void*  iter = taosHashIterate(src->args, NULL);
1216
  while (iter) {
1217
    STransCtxVal* sVal = (STransCtxVal*)iter;
1218
    key = taosHashGetKey(sVal, &klen);
1219

1220
    int32_t code = taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
1221
    if (code != 0) {
1222
      tError("failed to put val to hash, reason:%s", tstrerror(code));
1223
    }
1224
    iter = taosHashIterate(src->args, iter);
1225
  }
1226
  taosHashCleanup(src->args);
1227
}
1228
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
1229
  if (ctx->args == NULL) {
1230
    return NULL;
1231
  }
1232
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
1233
  if (cVal == NULL) {
1234
    return NULL;
1235
  }
1236
  void* ret = NULL;
1237
  TAOS_UNUSED((*cVal->clone)(cVal->val, &ret));
1238
  return ret;
1239
}
1240
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
1241
  void* ret = NULL;
1242
  if (ctx->brokenVal.clone == NULL) {
1243
    return ret;
1244
  }
1245
  TAOS_UNUSED((*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret));
1246

1247
  *msgType = ctx->brokenVal.msgType;
1248

1249
  return ret;
1250
}
1251

1252
bool cliMayGetAhandle(STrans* pTrans, SRpcMsg* pMsg) {
1253
  int64_t  seq = pMsg->info.seq;
1254
  int32_t* msgType = NULL;
1255

1256
  if (pMsg->msgType == TDMT_SCH_TASK_RELEASE || pMsg->msgType == TDMT_SCH_TASK_RELEASE + 1) {
1257
    STransCtx* ctx = taosHashGet(pTrans->sidTable, &pMsg->info.qId, sizeof(pMsg->info.qId));
1258
    transCtxCleanup(ctx);
1259
    taosHashRemove(pTrans->sidTable, &pMsg->info.qId, sizeof(pMsg->info.qId));
1260
    return true;
1261
  }
1262
  taosThreadMutexLock(&pTrans->seqMutex);
1263
  msgType = taosHashGet(pTrans->seqTable, &seq, sizeof(seq));
1264
  taosThreadMutexUnlock(&pTrans->seqMutex);
1265
  if (msgType == NULL) {
1266
    STransCtx* ctx = taosHashGet(pTrans->sidTable, &pMsg->info.qId, sizeof(pMsg->info.qId));
1267
    if (ctx == NULL) {
1268
      return false;
1269
    }
1270
    pMsg->info.ahandle = transCtxDumpVal(ctx, pMsg->msgType);
1271
    tError("failed to find msg type for seq:%" PRId64 ", gen ahandle for type %s", seq, TMSG_INFO(pMsg->msgType));
1272
  } else {
1273
    taosThreadMutexLock(&pTrans->seqMutex);
1274
    taosHashRemove(pTrans->seqTable, &seq, sizeof(seq));
1275
    msgType = taosHashGet(pTrans->seqTable, &seq, sizeof(seq));
1276
    taosThreadMutexUnlock(&pTrans->seqMutex);
1277
  }
1278
  return true;
1279
}
1280

1281
void* processSvrMsg(void* arg) {
1282
  TThread* thread = (TThread*)arg;
1283

1284
  int32_t    idx = thread->idx;
1285
  static int num = 0;
1286
  STaosQall* qall;
1287
  SRpcMsg *  pRpcMsg, rpcMsg;
1288
  int        type;
1289
  SQueueInfo qinfo = {0};
1290

1291
  taosAllocateQall(&qall);
1292

1293
  while (1) {
1294
    int numOfMsgs = taosReadAllQitemsFromQset(multiQ[1]->qset[idx], qall, &qinfo);
1295
    if (numOfMsgs <= 0) break;
1296
    taosResetQitems(qall);
1297
    for (int i = 0; i < numOfMsgs; i++) {
1298
      taosGetQitem(qall, (void**)&pRpcMsg);
1299
      taosThreadMutexLock(&mutex[1]);
1300
      RpcCfp    fp = NULL;
1301
      void*     parent = NULL;
1302
      STraceId* trace = &pRpcMsg->info.traceId;
1303
      tGDebug("taos %s received from taosd", TMSG_INFO(pRpcMsg->msgType));
1304
      STrans* pTrans = NULL;
1305
      transGetCb(pRpcMsg->type, &pTrans);
1306

1307
      taosThreadMutexUnlock(&mutex[1]);
1308

1309
      if (pTrans != NULL) {
1310
        if (cliMayGetAhandle(pTrans, pRpcMsg)) {
1311
          if (pRpcMsg->info.reqWithSem == NULL) {
1312
            (pTrans->cfp)(pTrans->parent, pRpcMsg, NULL);
1313
          } else {
1314
            STransReqWithSem* reqWithSem = pRpcMsg->info.reqWithSem;
1315
            memcpy(&reqWithSem->pMsg, pRpcMsg, sizeof(SRpcMsg));
1316
            tsem_post(reqWithSem->sem);
1317
          }
1318
        } else {
1319
          tDebug("taosd %s received from taosd, ignore", TMSG_INFO(pRpcMsg->msgType));
1320
        }
1321
      }
1322
      taosFreeQitem(pRpcMsg);
1323
    }
1324
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
1325
  }
1326

1327
  taosFreeQall(qall);
1328
  return NULL;
1329
}
1330
void* procClientMsg(void* arg) {
1331
  TThread* thread = (TThread*)arg;
1332

1333
  int32_t    idx = thread->idx;
1334
  static int num = 0;
1335
  STaosQall* qall;
1336
  SRpcMsg *  pRpcMsg, rpcMsg;
1337
  int        type;
1338
  SQueueInfo qinfo = {0};
1339

1340
  taosAllocateQall(&qall);
1341

1342
  while (1) {
1343
    int numOfMsgs = taosReadAllQitemsFromQset(multiQ[0]->qset[idx], qall, &qinfo);
1344
    tDebug("%d msgs are received", numOfMsgs);
1345
    if (numOfMsgs <= 0) break;
1346
    taosResetQitems(qall);
1347
    for (int i = 0; i < numOfMsgs; i++) {
1348
      taosGetQitem(qall, (void**)&pRpcMsg);
1349

1350
      STraceId* trace = &pRpcMsg->info.traceId;
1351
      tDebug("taosc %s received from taosc", TMSG_INFO(pRpcMsg->msgType));
1352
      RpcCfp fp = NULL;
1353
      // void*  parent;
1354
      STrans* pTrans = NULL;
1355
      taosThreadMutexLock(&mutex[1]);
1356
      if ((pRpcMsg->type & TD_ASTRA_DSVR) != 0) {
1357
        transGetCb(TD_ASTRA_DSVR, &pTrans);
1358
      }
1359
      taosThreadMutexUnlock(&mutex[1]);
1360
      if (pTrans->cfp != NULL) {
1361
        (pTrans->cfp)(pTrans->parent, pRpcMsg, NULL);
1362
      } else {
1363
        tError("taosc failed to find callback for msg type:%s", TMSG_INFO(pRpcMsg->msgType));
1364
      }
1365
      taosFreeQitem(pRpcMsg);
1366
    }
1367
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
1368
  }
1369

1370
  taosFreeQall(qall);
1371
  return NULL;
1372
}
1373
static void transInitEnv() {
1374
  refMgt = transOpenRefMgt(50000, transDestroyExHandle);
1375
  svrRefMgt = transOpenRefMgt(50000, transDestroyExHandle);
1376
  instMgt = taosOpenRef(50, rpcCloseImpl);
1377
  transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg);
1378

1379
  taosThreadMutexInit(&tableMutex, NULL);
1380
  hashTable = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
1381

1382
  int32_t numOfAthread = 1;
1383

1384
  multiQ[0] = taosMemoryMalloc(sizeof(MultiThreadQhandle));
1385
  multiQ[0]->numOfThread = numOfAthread;
1386
  multiQ[0]->qhandle = (STaosQueue**)taosMemoryMalloc(sizeof(STaosQueue*) * numOfAthread);
1387
  multiQ[0]->qset = (STaosQset**)taosMemoryMalloc(sizeof(STaosQset*) * numOfAthread);
1388

1389
  taosThreadMutexInit(&mutex[0], NULL);
1390

1391
  for (int i = 0; i < numOfAthread; i++) {
1392
    taosOpenQueue(&(multiQ[0]->qhandle[i]));
1393
    taosOpenQset(&multiQ[0]->qset[i]);
1394
    taosAddIntoQset(multiQ[0]->qset[i], multiQ[0]->qhandle[i], NULL);
1395
  }
1396
  {
1397
    TThread* threads = taosMemoryMalloc(sizeof(TThread) * numOfAthread);
1398
    for (int i = 0; i < numOfAthread; i++) {
1399
      threads[i].idx = i;
1400
      taosThreadCreate(&(threads[i].thread), NULL, procClientMsg, (void*)&threads[i]);
1401
    }
1402
  }
1403

1404
  multiQ[1] = taosMemoryMalloc(sizeof(MultiThreadQhandle));
1405
  multiQ[1]->numOfThread = numOfAthread;
1406
  multiQ[1]->qhandle = (STaosQueue**)taosMemoryMalloc(sizeof(STaosQueue*) * numOfAthread);
1407
  multiQ[1]->qset = (STaosQset**)taosMemoryMalloc(sizeof(STaosQset*) * numOfAthread);
1408
  taosThreadMutexInit(&mutex[1], NULL);
1409

1410
  for (int i = 0; i < numOfAthread; i++) {
1411
    taosOpenQueue(&(multiQ[1]->qhandle[i]));
1412
    taosOpenQset(&multiQ[1]->qset[i]);
1413
    taosAddIntoQset(multiQ[1]->qset[i], multiQ[1]->qhandle[i], NULL);
1414
  }
1415
  {
1416
    TThread* threads = taosMemoryMalloc(sizeof(TThread) * numOfAthread);
1417
    for (int i = 0; i < numOfAthread; i++) {
1418
      threads[i].idx = i;
1419
      taosThreadCreate(&(threads[i].thread), NULL, processSvrMsg, (void*)&threads[i]);
1420
    }
1421
  }
1422
}
1423
static void transDestroyEnv() {
1424
  transCloseRefMgt(refMgt);
1425
  transCloseRefMgt(svrRefMgt);
1426
}
1427

1428
typedef struct {
1429
  void (*fp)(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
1430
  RPC_TYPE type;
1431
  void*    parant;
1432
  STrans*  pTransport;
1433
} FP_TYPE;
1434
int32_t transUpdateCb(RPC_TYPE type, STrans* pTransport) {
1435
  taosThreadMutexLock(&tableMutex);
1436

1437
  FP_TYPE t = {.type = type, .pTransport = pTransport};
1438
  int32_t code = taosHashPut(hashTable, &type, sizeof(type), &t, sizeof(t));
1439
  taosThreadMutexUnlock(&tableMutex);
1440
  return 0;
1441
}
1442
int32_t transGetCb(RPC_TYPE type, STrans** ppTransport) {
1443
  taosThreadMutexLock(&tableMutex);
1444
  void* p = taosHashGet(hashTable, &type, sizeof(type));
1445
  if (p == NULL) {
1446
    taosThreadMutexUnlock(&tableMutex);
1447
    return TSDB_CODE_INVALID_MSG;
1448
  }
1449
  FP_TYPE* t = p;
1450
  *ppTransport = t->pTransport;
1451
  // *fp = t->fp;
1452
  // *arg = t->parant;
1453
  taosThreadMutexUnlock(&tableMutex);
1454
  return 0;
1455
}
1456

1457
int32_t transSendReq(STrans* pTransport, SRpcMsg* pMsg, void* pEpSet) {
1458
  SRpcMsg* pTemp;
1459

1460
  taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0, (void**)&pTemp);
1461
  memcpy(pTemp, pMsg, sizeof(SRpcMsg));
1462

1463
  int64_t cidx = multiQ[0]->idx++;
1464
  int32_t idx = cidx % (multiQ[0]->numOfThread);
1465
  tDebug("taos request is sent , type:%s, contLen:%d, item:%p", TMSG_INFO(pMsg->msgType), pMsg->contLen, pTemp);
1466
  taosWriteQitem(multiQ[0]->qhandle[idx], pTemp);
1467
  return 0;
1468
}
1469
int32_t transSendResp(const SRpcMsg* pMsg) {
1470
  SRpcMsg* pTemp;
1471

1472
  taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0, (void**)&pTemp);
1473
  memcpy(pTemp, pMsg, sizeof(SRpcMsg));
1474

1475
  int64_t cidx = multiQ[1]->idx++;
1476
  int32_t idx = cidx % (multiQ[1]->numOfThread);
1477
  tDebug("taos resp is sent to, type:%s, contLen:%d, item:%p", TMSG_INFO(pMsg->msgType), pMsg->contLen, pTemp);
1478
  taosWriteQitem(multiQ[1]->qhandle[idx], pTemp);
1479
  return 0;
1480
}
1481

1482
int32_t transInit() {
1483
  // init env
1484
  int32_t code = taosThreadOnce(&transModuleInit, transInitEnv);
1485
  if (code != 0) {
1486
    code = TAOS_SYSTEM_ERROR(ERRNO);
1487
  }
1488
  return code;
1489
}
1490

1491
int32_t transGetRefMgt() { return refMgt; }
1492
int32_t transGetSvrRefMgt() { return svrRefMgt; }
1493
int32_t transGetInstMgt() { return instMgt; }
1494
int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; }
1495

1496
void transCleanup() {
1497
  // clean env
1498
  transDestroyEnv();
1499
  return;
1500
}
1501
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
1502
  /// add later
1503
  return taosOpenRef(size, func);
1504
}
1505
void transCloseRefMgt(int32_t mgt) {
1506
  // close ref
1507
  taosCloseRef(mgt);
1508
  return;
1509
}
1510
int64_t transAddExHandle(int32_t refMgt, void* p) {
1511
  return taosAddRef(refMgt, p);
1512
  // acquire extern handle
1513
}
1514
void transRemoveExHandle(int32_t refMgt, int64_t refId) {
1515
  // acquire extern handle
1516
  int32_t code = taosRemoveRef(refMgt, refId);
1517
  return;
1518
}
1519

1520
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
1521
  // acquire extern handle
1522
  return (void*)taosAcquireRef(refMgt, refId);
1523
}
1524

1525
void transReleaseExHandle(int32_t refMgt, int64_t refId) {
1526
  // release extern handle
1527
  int32_t code = taosReleaseRef(refMgt, refId);
1528
  return;
1529
}
1530
void transDestroyExHandle(void* handle) {
1531
  if (handle == NULL) {
1532
    return;
1533
  }
1534
  SExHandle* eh = handle;
1535
  if (!QUEUE_IS_EMPTY(&eh->q)) {
1536
    tDebug("handle %p mem leak", handle);
1537
  }
1538
  tDebug("free exhandle %p", handle);
1539
  taosMemoryFree(handle);
1540
  return;
1541
}
1542

1543
void transDestroySyncMsg(void* msg) {
1544
  if (msg == NULL) return;
1545

1546
  STransSyncMsg* pSyncMsg = msg;
1547
  TAOS_UNUSED(tsem2_destroy(pSyncMsg->pSem));
1548
  taosMemoryFree(pSyncMsg->pSem);
1549
  transFreeMsg(pSyncMsg->pRsp->pCont);
1550
  taosMemoryFree(pSyncMsg->pRsp);
1551
  taosMemoryFree(pSyncMsg);
1552
  return;
1553
}
1554

1555
uint32_t subnetIpRang2Int(SIpV4Range* pRange) { return 0; }
1556
int32_t  subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange) { return 0; }
1557
int32_t  subnetDebugInfoToBuf(SubnetUtils* pUtils, char* buf) { return 0; }
1558
int32_t  subnetCheckIp(SubnetUtils* pUtils, uint32_t ip) { return 0; }
1559

1560
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return 0; }
1561

1562
int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) { return 0; }
1563

1564
int32_t transInitBuffer(SConnBuffer* buf) {
1565
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
1566
  if (buf->buf == NULL) {
1567
    return terrno;
1568
  }
1569

1570
  buf->cap = BUFFER_CAP;
1571
  buf->left = -1;
1572
  buf->len = 0;
1573
  buf->total = 0;
1574
  buf->invalid = 0;
1575
  return 0;
1576
}
1577
void transDestroyBuffer(SConnBuffer* p) {
1578
  taosMemoryFree(p->buf);
1579
  p->buf = NULL;
1580
}
1581

1582
int32_t transClearBuffer(SConnBuffer* buf) {
1583
  SConnBuffer* p = buf;
1584
  if (p->cap > BUFFER_CAP) {
1585
    p->cap = BUFFER_CAP;
1586
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
1587
    if (p->buf == NULL) {
1588
      return terrno;
1589
    }
1590
  }
1591
  p->left = -1;
1592
  p->len = 0;
1593
  p->total = 0;
1594
  p->invalid = 0;
1595
  return 0;
1596
}
1597

1598
int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
1599
  static const int HEADSIZE = sizeof(STransMsgHead);
1600
  int32_t          code = 0;
1601
  SConnBuffer*     p = connBuf;
1602
  if (p->left != 0 || p->total <= 0) {
1603
    return TSDB_CODE_INVALID_MSG;
1604
  }
1605
  int total = p->total;
1606
  if (total >= HEADSIZE && !p->invalid) {
1607
    *buf = taosMemoryCalloc(1, total);
1608
    if (*buf == NULL) {
1609
      return terrno;
1610
    }
1611
    memcpy(*buf, p->buf, total);
1612
    if ((code = transResetBuffer(connBuf, resetBuf)) < 0) {
1613
      return code;
1614
    }
1615
  } else {
1616
    total = -1;
1617
    return TSDB_CODE_INVALID_MSG;
1618
  }
1619
  return total;
1620
}
1621

1622
int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
1623
  SConnBuffer* p = connBuf;
1624
  if (p->total < p->len) {
1625
    int left = p->len - p->total;
1626
    memmove(p->buf, p->buf + p->total, left);
1627
    p->left = -1;
1628
    p->total = 0;
1629
    p->len = left;
1630
  } else if (p->total == p->len) {
1631
    p->left = -1;
1632
    p->total = 0;
1633
    p->len = 0;
1634
    if (p->cap > BUFFER_CAP) {
1635
      if (resetBuf) {
1636
        p->cap = BUFFER_CAP;
1637
        p->buf = taosMemoryRealloc(p->buf, p->cap);
1638
        if (p->buf == NULL) {
1639
          return terrno;
1640
        }
1641
      }
1642
    }
1643
  } else {
1644
    tError("failed to reset buffer, total:%d, len:%d since %s", p->total, p->len, tstrerror(TSDB_CODE_INVALID_MSG));
1645
    return TSDB_CODE_INVALID_MSG;
1646
  }
1647
  return 0;
1648
}
1649

1650
int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet) {
1651
  if (pEpset == NULL) {
1652
    return TSDB_CODE_INVALID_PARA;
1653
  }
1654

1655
  if (pReqEpSet == NULL) {
1656
    return TSDB_CODE_INVALID_PARA;
1657
  }
1658

1659
  int32_t    size = sizeof(SReqEpSet) + sizeof(SEp) * pEpset->numOfEps;
1660
  SReqEpSet* pReq = (SReqEpSet*)taosMemoryCalloc(1, size);
1661
  if (pReq == NULL) {
1662
    return TSDB_CODE_OUT_OF_MEMORY;
1663
  }
1664
  memcpy((char*)pReq, (char*)pEpset, size);
1665
  // clear previous
1666
  taosMemoryFree(*pReqEpSet);
1667

1668
  if (transValidReqEpset(pReq) != TSDB_CODE_SUCCESS) {
1669
    taosMemoryFree(pReq);
1670
    return TSDB_CODE_INVALID_PARA;
1671
  }
1672

1673
  *pReqEpSet = pReq;
1674
  return TSDB_CODE_SUCCESS;
1675
}
1676

1677
int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet) {
1678
  if (pReqEpSet == NULL) {
1679
    return TSDB_CODE_INVALID_PARA;
1680
  }
1681
  memcpy((char*)pEpSet, (char*)pReqEpSet, sizeof(SReqEpSet) + sizeof(SEp) * pReqEpSet->numOfEps);
1682
  return TSDB_CODE_SUCCESS;
1683
}
1684

1685
int32_t transValidReqEpset(SReqEpSet* pReqEpSet) {
1686
  if (pReqEpSet == NULL) {
1687
    return TSDB_CODE_INVALID_PARA;
1688
  }
1689
  if (pReqEpSet->numOfEps == 0 || pReqEpSet->numOfEps > TSDB_MAX_EP_NUM || pReqEpSet->inUse >= TSDB_MAX_EP_NUM) {
1690
    return TSDB_CODE_INVALID_PARA;
1691
  }
1692
  return TSDB_CODE_SUCCESS;
1693
}
1694

1695
#endif  // TD_ASTRA_RPC
1696

1697
int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
813,074✔
1698
  QUEUE_INIT(&wq->node);
813,074✔
1699
  wq->freeFunc = (void (*)(void*))freeFunc;
813,074✔
1700
  wq->size = 0;
813,074✔
1701
  wq->inited = 1;
813,074✔
1702
  return 0;
813,074✔
1703
}
1704
void transQueuePush(STransQueue* q, void* arg) {
123,990,930✔
1705
  queue* node = arg;
123,990,930✔
1706
  QUEUE_PUSH(&q->node, node);
123,990,930✔
1707
  q->size++;
123,990,930✔
1708
}
123,990,930✔
1709
void* transQueuePop(STransQueue* q) {
89,107,899✔
1710
  if (q->size == 0) return NULL;
89,107,899!
1711

1712
  queue* head = QUEUE_HEAD(&q->node);
89,107,899✔
1713
  QUEUE_REMOVE(head);
89,107,899✔
1714
  q->size--;
89,107,899✔
1715
  return head;
89,107,899✔
1716
}
1717
int32_t transQueueSize(STransQueue* q) { return q->size; }
552,657,734✔
1718

1719
void* transQueueGet(STransQueue* q, int idx) {
×
1720
  if (q->size == 0) return NULL;
×
1721

1722
  while (idx-- > 0) {
×
1723
    queue* node = QUEUE_NEXT(&q->node);
×
1724
    if (node == &q->node) return NULL;
×
1725
  }
1726
  return NULL;
×
1727
}
1728

1729
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size) {
73,993,714✔
1730
  queue* d = dst;
73,993,714✔
1731
  queue* node = QUEUE_NEXT(&q->node);
73,993,714✔
1732
  while (node != &q->node) {
123,838,144✔
1733
    queue* next = QUEUE_NEXT(node);
70,628,240✔
1734
    if (filter && filter(node, arg)) {
70,628,240✔
1735
      QUEUE_REMOVE(node);
34,920,457✔
1736
      q->size--;
34,920,457✔
1737
      QUEUE_PUSH(d, node);
34,920,457✔
1738
      if (--size == 0) {
34,920,457✔
1739
        break;
20,779,940✔
1740
      }
1741
    }
1742
    node = next;
49,844,430✔
1743
  }
1744
}
73,989,844✔
1745

1746
void* tranQueueHead(STransQueue* q) {
×
1747
  if (q->size == 0) return NULL;
×
1748

1749
  queue* head = QUEUE_HEAD(&q->node);
×
1750
  return head;
×
1751
}
1752

1753
void* transQueueRm(STransQueue* q, int i) {
×
1754
  // if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
1755
  //   return NULL;
1756
  // }
1757
  // if (i >= taosArrayGetSize(queue->q)) {
1758
  //   return NULL;
1759
  // }
1760
  // void* ptr = taosArrayGetP(queue->q, i);
1761
  // taosArrayRemove(queue->q, i);
1762
  // return ptr;
1763
  return NULL;
×
1764
}
1765

1766
void transQueueRemove(STransQueue* q, void* e) {
×
1767
  if (q->size == 0) return;
×
1768
  queue* node = e;
×
1769
  QUEUE_REMOVE(node);
×
1770
  q->size--;
×
1771
}
1772

1773
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
69,409,747✔
1774

1775
void transQueueClear(STransQueue* q) {
210,317✔
1776
  if (q->inited == 0) return;
210,317!
1777
  while (!QUEUE_IS_EMPTY(&q->node)) {
210,317!
1778
    queue* h = QUEUE_HEAD(&q->node);
×
1779
    QUEUE_REMOVE(h);
×
1780
    if (q->freeFunc != NULL) (q->freeFunc)(h);
×
1781
    q->size--;
×
1782
  }
1783
}
1784
void transQueueDestroy(STransQueue* q) { transQueueClear(q); }
210,318✔
1785

1786
void transPrintEpSet(SEpSet* pEpSet) {
934✔
1787
  if (pEpSet == NULL) {
934!
1788
    tTrace("NULL epset");
×
1789
    return;
×
1790
  }
1791
  char buf[512] = {0};
934✔
1792
  int  len = tsnprintf(buf, sizeof(buf), "epset:{");
934✔
1793
  for (int i = 0; i < pEpSet->numOfEps; i++) {
2,884✔
1794
    if (i == pEpSet->numOfEps - 1) {
1,950✔
1795
      len += tsnprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
934✔
1796
    } else {
1797
      len += tsnprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
1,016✔
1798
    }
1799
  }
1800
  len += tsnprintf(buf + len, sizeof(buf) - len, "}");
934✔
1801
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
934!
1802
}
1803
bool transReqEpsetIsEqual(SReqEpSet* a, SReqEpSet* b) {
20,598,316✔
1804
  if (a == NULL && b == NULL) {
20,598,316!
1805
    return true;
×
1806
  } else if (a == NULL || b == NULL) {
20,598,316!
1807
    return false;
×
1808
  }
1809

1810
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
20,613,314!
UNCOV
1811
    return false;
×
1812
  }
1813
  for (int i = 0; i < a->numOfEps; i++) {
40,436,833✔
1814
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
21,258,660!
1815
      return false;
1,441,660✔
1816
    }
1817
  }
1818
  return true;
19,178,173✔
1819
}
1820
bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b) {
66,688✔
1821
  if (a->numOfEps != b->numOfEps) {
66,688✔
1822
    return false;
401✔
1823
  }
1824
  for (int i = 0; i < a->numOfEps; i++) {
256,228✔
1825
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
190,007✔
1826
      return false;
66✔
1827
    }
1828
  }
1829
  return true;
66,221✔
1830
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc