• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.9
/source/libs/transport/src/transComm.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "transComm.h"
17
#include "osTime.h"
18
#include "tchecksum.h"
19
#include "tqueue.h"
20
#include "transLog.h"
21
#include "transSasl.h"
22

23
#ifndef TD_ASTRA_RPC
24
#define BUFFER_CAP 8 * 1024
25

26
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
27

28
static int32_t refMgt;
29
static int32_t svrRefMgt;
30
static int32_t instMgt;
31
static int32_t transSyncMsgMgt;
32

33
static void transDestroySyncMsg(void* msg);
34
typedef struct {
35
  int64_t refId;
36
  STrans* pTrans;
37
  int32_t ref;
38
} STransEntry;
39

40
/*
41
 * pArray: array of STransEntry; typically contains fewer than 8 entries.
42
 * lock: The lock protects concurrent access (reads/writes) to this array.
43
 * Access pattern is heavily read-dominated; write operations are rare.
44
 */
45

46
typedef struct {
47
  SArray*        pArray;
48
  TdThreadRwlock lock;
49
} STransCache;
50

51
static STransCache transInstCache;
52

53
static void transCacheInit() {
2,817,698✔
54
  transInstCache.pArray = taosArrayInit(16, sizeof(STransEntry));
2,817,698✔
55
  if (transInstCache.pArray == NULL) {
2,817,698✔
56
    tError("failed to init trans cache since %s", tstrerror(terrno));
×
57
    return;
×
58
  }
59

60
  (void)taosThreadRwlockInit(&transInstCache.lock, NULL);
2,817,698✔
61
}
62

63
int32_t transCachePut(int64_t refId, STrans* pTrans) {
4,870,679✔
64
  int32_t code = 0;
4,870,679✔
65
  (void)taosThreadRwlockWrlock(&transInstCache.lock);
4,870,679✔
66

67
  STransEntry entry = {.refId = refId, .pTrans = pTrans, .ref = 0};
4,870,679✔
68
  if (NULL == taosArrayPush(transInstCache.pArray, &entry)) {
9,741,358✔
69
    code = terrno;
×
70
  }
71
  (void)taosThreadRwlockUnlock(&transInstCache.lock);
4,870,679✔
72
  return code;
4,870,679✔
73
}
74

75
int32_t transCacheAcquireById(int64_t refId, STrans** pTrans) {
2,147,483,647✔
76
  int32_t code = TSDB_CODE_RPC_MODULE_QUIT;
2,147,483,647✔
77

78
  (void)taosThreadRwlockRdlock(&transInstCache.lock);
2,147,483,647✔
79

80
  for (int32_t i = 0; i < taosArrayGetSize(transInstCache.pArray); ++i) {
2,147,483,647✔
81
    STransEntry* p = taosArrayGet(transInstCache.pArray, i);
2,147,483,647✔
82
    if (p->refId == refId) {
2,147,483,647✔
83
      *pTrans = p->pTrans;
2,147,483,647✔
84
      (void)atomic_fetch_add_32(&p->ref, 1);
2,147,483,647✔
85
      tDebug("trans %p acquire by refId:%" PRId64 ", ref count:%d", p->pTrans, refId, atomic_load_32(&p->ref));
2,147,483,647✔
86
      code = 0;
2,147,483,647✔
87
      break;
2,147,483,647✔
88
    }
89
  }
90

91
  (void)taosThreadRwlockUnlock(&transInstCache.lock);
2,147,483,647✔
92
  if (code != 0) {
2,147,483,647✔
93
    tError("failed to acquire from trans cache by refId:%" PRId64 " since %s", refId, tstrerror(code));
8,274✔
94
  }
95
  return code;
2,147,483,647✔
96
}
97

98
void transCacheReleaseByRefId(int64_t refId) {
2,147,483,647✔
99
  int32_t code = TSDB_CODE_RPC_MODULE_QUIT;
2,147,483,647✔
100

101
  (void)taosThreadRwlockRdlock(&transInstCache.lock);
2,147,483,647✔
102

103
  for (int32_t i = 0; i < taosArrayGetSize(transInstCache.pArray); i++) {
2,147,483,647✔
104
    STransEntry* p = taosArrayGet(transInstCache.pArray, i);
2,147,483,647✔
105
    if (p->refId == refId) {
2,147,483,647✔
106
      (void)atomic_sub_fetch_32(&p->ref, 1);
2,147,483,647✔
107
      tDebug("trans %p release by refId:%" PRId64 ", ref count:%d", p->pTrans, refId, atomic_load_32(&p->ref));
2,147,483,647✔
108
      code = 0;
2,147,483,647✔
109
      break;
2,147,483,647✔
110
    }
111
  }
112

113
  (void)taosThreadRwlockUnlock(&transInstCache.lock);
2,147,483,647✔
114
  if (code != 0) {
2,147,483,647✔
115
    tInfo("failed to remove from trans cache by refId:%" PRId64 " since %s", refId, tstrerror(code));
591✔
116
  }
117
}
2,147,483,647✔
118

119
void transCacheRemoveByRefId(int64_t refId) {
4,870,679✔
120
  int32_t code = TSDB_CODE_RPC_MODULE_QUIT;
4,870,679✔
121

122
  (void)taosThreadRwlockWrlock(&transInstCache.lock);
4,870,679✔
123

124
  for (int32_t i = 0; i < taosArrayGetSize(transInstCache.pArray); i++) {
6,999,900✔
125
    STransEntry* p = taosArrayGet(transInstCache.pArray, i);
6,999,900✔
126
    if (p->refId == refId) {
6,999,900✔
127
      taosArrayRemove(transInstCache.pArray, i);
4,870,679✔
128
      code = 0;
4,870,679✔
129
      break;
4,870,679✔
130
    }
131
  }
132
  (void)taosThreadRwlockUnlock(&transInstCache.lock);
4,870,679✔
133

134
  if (code != 0) {
4,870,679✔
135
    tError("failed to remove from trans cache by refId:%" PRId64 " since %s", refId, tstrerror(code));
×
136
  }
137
}
4,870,679✔
138

139
void transCacheDestroy() {
×
140
  taosArrayDestroyP(transInstCache.pArray, NULL);
×
141
  (void)taosThreadRwlockDestroy(&transInstCache.lock);
×
142
}
×
143

144
int32_t transCompressMsg(char* msg, int32_t len) {
10,845,089✔
145
  int32_t        ret = 0;
10,845,089✔
146
  int            compHdr = sizeof(STransCompMsg);
10,845,089✔
147
  STransMsgHead* pHead = transHeadFromCont(msg);
10,845,089✔
148

149
  int64_t start = taosGetTimestampMs();
10,844,524✔
150
  char*   buf = taosMemoryMalloc(len + compHdr + 8);  // 8 extra bytes
10,844,524✔
151
  if (buf == NULL) {
10,845,089✔
152
    tWarn("failed to allocate memory for rpc msg compression, contLen:%d", len);
×
153
    ret = len;
×
154
    return ret;
×
155
  }
156

157
  int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
10,845,089✔
158
  /*
159
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
160
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
161
   */
162
  if (clen > 0 && clen < len - compHdr) {
10,844,524✔
163
    STransCompMsg* pComp = (STransCompMsg*)msg;
10,805,982✔
164
    pComp->reserved = 0;
10,805,982✔
165
    pComp->contLen = htonl(len);
10,805,982✔
166
    memcpy(msg + compHdr, buf, clen);
10,805,417✔
167

168
    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
10,803,722✔
169
    ret = clen + compHdr;
10,805,982✔
170
    pHead->comp = 1;
10,805,982✔
171
  } else {
172
    ret = len;
38,542✔
173
    pHead->comp = 0;
38,542✔
174
  }
175
  taosMemoryFree(buf);
10,846,219✔
176

177
  int64_t elapse = taosGetTimestampMs() - start;
10,846,219✔
178
  if (elapse >= 100) {
10,846,219✔
179
    tWarn("compress msg cost %dms", (int)(elapse));
17,328✔
180
  }
181
  return ret;
10,846,219✔
182
}
183
int32_t transDecompressMsg(char** msg, int32_t* len) {
2,147,483,647✔
184
  STransMsgHead* pHead = (STransMsgHead*)(*msg);
2,147,483,647✔
185
  if (pHead->comp == 0) return 0;
2,147,483,647✔
186

187
  int64_t start = taosGetTimestampMs();
10,836,552✔
188

189
  char* pCont = transContFromHead(pHead);
10,836,552✔
190

191
  STransCompMsg* pComp = (STransCompMsg*)pCont;
10,836,552✔
192
  int32_t        oriLen = ntohl(pComp->contLen);
10,836,552✔
193

194
  int32_t tlen = *len;
10,837,117✔
195
  char*   buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
10,835,422✔
196
  if (buf == NULL) {
10,833,727✔
197
    return terrno;
×
198
  }
199

200
  STransMsgHead* pNewHead = (STransMsgHead*)buf;
10,833,727✔
201
  int32_t        decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
10,833,727✔
202
                                                 tlen - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
14,504✔
203

204
  if (decompLen != oriLen) {
10,835,422✔
205
    taosMemoryFree(buf);
×
206
    return TSDB_CODE_INVALID_MSG;
×
207
  }
208
  memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
10,835,422✔
209

210
  *len = oriLen + sizeof(STransMsgHead);
10,835,422✔
211
  pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));
10,835,987✔
212

213
  taosMemoryFree(pHead);
10,837,682✔
214
  *msg = buf;
10,835,422✔
215

216
  int64_t elapse = taosGetTimestampMs() - start;
10,837,117✔
217
  if (elapse >= 100) {
10,837,117✔
218
    tWarn("dcompress msg cost %dms", (int)(elapse));
×
219
  }
220
  return 0;
10,836,552✔
221
}
UNCOV
222
int32_t transDecompressMsgExt(char const* msg, int32_t len, char** out, int32_t* outLen) {
×
UNCOV
223
  STransMsgHead* pHead = (STransMsgHead*)msg;
×
UNCOV
224
  char*          pCont = transContFromHead(pHead);
×
225

UNCOV
226
  STransCompMsg* pComp = (STransCompMsg*)pCont;
×
UNCOV
227
  int32_t        oriLen = ntohl(pComp->contLen);
×
228

UNCOV
229
  int32_t tlen = len;
×
UNCOV
230
  char*   buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
×
UNCOV
231
  if (buf == NULL) {
×
232
    return terrno;
×
233
  }
UNCOV
234
  int64_t start = taosGetTimestampMs();
×
235

UNCOV
236
  STransMsgHead* pNewHead = (STransMsgHead*)buf;
×
UNCOV
237
  int32_t        decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
×
238
                                                 tlen - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
×
UNCOV
239
  if (decompLen != oriLen) {
×
240
    tError("msgLen:%d, originLen:%d, decompLen:%d", len, oriLen, decompLen);
×
241
    taosMemoryFree(buf);
×
242
    return TSDB_CODE_INVALID_MSG;
×
243
  }
UNCOV
244
  memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
×
245

UNCOV
246
  *out = buf;
×
UNCOV
247
  *outLen = oriLen + sizeof(STransMsgHead);
×
UNCOV
248
  pNewHead->msgLen = *outLen;
×
UNCOV
249
  pNewHead->comp = 0;
×
250

UNCOV
251
  int64_t elapse = taosGetTimestampMs() - start;
×
UNCOV
252
  if (elapse >= 100) {
×
253
    tWarn("dcompress msg cost %dms", (int)(elapse));
×
254
  }
UNCOV
255
  return 0;
×
256
}
257

258
void transFreeMsg(void* msg) {
2,147,483,647✔
259
  if (msg == NULL) {
2,147,483,647✔
260
    return;
2,147,483,647✔
261
  }
262
  tTrace("cont:%p, rpc free", (char*)msg - TRANS_MSG_OVERHEAD);
2,147,483,647✔
263
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
2,147,483,647✔
264
}
265
void transSockInfo2Str(struct sockaddr* sockname, char* dst, int32_t cap) {
148,721,846✔
266
  char     buf[IP_RESERVE_CAP] = {0};
148,721,846✔
267
  uint16_t port = 0;
148,738,221✔
268
  int      r = 0;
148,738,221✔
269
  if (sockname->sa_family == AF_INET) {
148,738,221✔
270
    struct sockaddr_in* addr = (struct sockaddr_in*)sockname;
148,718,100✔
271

272
    r = uv_ip4_name(addr, (char*)buf, sizeof(buf));
148,718,100✔
273
    if (r != 0) {
148,776,268✔
274
      uError("failed to get ip from sockaddr, err:%s", uv_strerror(r));
×
275
    }
276

277
    port = ntohs(addr->sin_port);
148,776,385✔
278
  } else if (sockname->sa_family == AF_INET6) {
26✔
279
    struct sockaddr_in6* addr = (struct sockaddr_in6*)sockname;
×
280

281
    r = uv_ip6_name(addr, buf, sizeof(buf));
×
282
    if (r != 0) {
×
283
      uError("failed to get ip from sockaddr6, err:%s", uv_strerror(r));
×
284
    }
285

286
    port = ntohs(addr->sin6_port);
×
287
  }
288
  snprintf(dst, cap, "%s:%d", buf, port);
148,742,072✔
289
}
148,757,421✔
290
int32_t transInitBuffer(SConnBuffer* buf) {
89,308,652✔
291
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
89,308,652✔
292
  if (buf->buf == NULL) {
89,314,983✔
293
    return terrno;
×
294
  }
295

296
  buf->cap = BUFFER_CAP;
89,314,222✔
297
  buf->left = -1;
89,318,805✔
298
  buf->len = 0;
89,318,604✔
299
  buf->total = 0;
89,317,648✔
300
  buf->invalid = 0;
89,315,223✔
301
  return 0;
89,318,424✔
302
}
303
void transDestroyBuffer(SConnBuffer* p) {
89,319,222✔
304
  taosMemoryFree(p->buf);
89,319,222✔
305
  p->buf = NULL;
89,315,455✔
306
}
89,316,202✔
307

308
int32_t transClearBuffer(SConnBuffer* buf) {
×
309
  SConnBuffer* p = buf;
×
310
  if (p->cap > BUFFER_CAP) {
×
311
    p->cap = BUFFER_CAP;
×
312
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
×
313
    if (p->buf == NULL) {
×
314
      return terrno;
×
315
    }
316
  }
317
  p->left = -1;
×
318
  p->len = 0;
×
319
  p->total = 0;
×
320
  p->invalid = 0;
×
321
  return 0;
×
322
}
323

324
int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf, int32_t* len) {
2,147,483,647✔
325
  static const int HEADSIZE = sizeof(STransMsgHead);
326
  int32_t          code = 0;
2,147,483,647✔
327
  SConnBuffer*     p = connBuf;
2,147,483,647✔
328
  if (p->left != 0 || p->total <= 0) {
2,147,483,647✔
329
    return TSDB_CODE_INVALID_MSG;
×
330
  }
331
  int total = p->total;
2,147,483,647✔
332
  if (total >= HEADSIZE && !p->invalid) {
2,147,483,647✔
333
    *buf = taosMemoryCalloc(1, total);
2,147,483,647✔
334
    if (*buf == NULL) {
2,147,483,647✔
335
      return terrno;
×
336
    }
337
    memcpy(*buf, p->buf, total);
2,147,483,647✔
338
    if ((code = transResetBuffer(connBuf, resetBuf)) < 0) {
2,147,483,647✔
339
      return code;
×
340
    }
341

342
    if (connBuf->total == 0) {
2,147,483,647✔
343
      code = transDoCrcCheck(*buf, total);
2,147,483,647✔
344
      if (code != 0) {
2,147,483,647✔
345
        tError("failed to check crc for msg in buffer, total:%d since %s", total, tstrerror(code));
45,650✔
346
        taosMemoryFree(*buf);
×
347
        *buf = NULL;
×
348
        return code;
×
349
      }
350
    }
351
    *len = total;
2,147,483,647✔
352
  } else {
353
    *len = -1;
129✔
354
    code = TSDB_CODE_INVALID_MSG;
×
355
  }
356
  return code;
2,147,483,647✔
357
}
358

359
int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
2,147,483,647✔
360
  SConnBuffer* p = connBuf;
2,147,483,647✔
361
  if (p->total < p->len) {
2,147,483,647✔
362
    int left = p->len - p->total;
100,374,254✔
363
    memmove(p->buf, p->buf + p->total, left);
100,375,525✔
364
    p->left = -1;
100,377,058✔
365
    p->total = 0;
100,376,061✔
366
    p->len = left;
100,373,142✔
367
  } else if (p->total == p->len) {
2,147,483,647✔
368
    p->left = -1;
2,147,483,647✔
369
    p->total = 0;
2,147,483,647✔
370
    p->len = 0;
2,147,483,647✔
371
    if (p->cap > BUFFER_CAP) {
2,147,483,647✔
372
      if (resetBuf) {
1,468,407,394✔
373
        p->cap = BUFFER_CAP;
×
374
        p->buf = taosMemoryRealloc(p->buf, p->cap);
×
375
        if (p->buf == NULL) {
×
376
          return terrno;
×
377
        }
378
      }
379
    }
380
  } else {
381
    tError("failed to reset buffer, total:%d, len:%d since %s", p->total, p->len, tstrerror(TSDB_CODE_INVALID_MSG));
×
382
    return TSDB_CODE_INVALID_MSG;
×
383
  }
384
  return 0;
2,147,483,647✔
385
}
386
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
2,147,483,647✔
387
  /*
388
   * formate of data buffer:
389
   * |<--------------------------data from socket------------------------------->|
390
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
391
   * info--->|
392
   */
393
  SConnBuffer* p = connBuf;
2,147,483,647✔
394
  uvBuf->base = p->buf + p->len;
2,147,483,647✔
395
  if (p->left == -1) {
2,147,483,647✔
396
    uvBuf->len = p->cap - p->len;
2,147,483,647✔
397
  } else {
398
    if (p->left < p->cap - p->len) {
90,295,424✔
399
      uvBuf->len = p->left;
55,838,756✔
400
    } else {
401
      p->cap = p->left + p->len;
34,461,152✔
402
      p->buf = taosMemoryRealloc(p->buf, p->cap);
34,461,044✔
403
      if (p->buf == NULL) {
34,505,584✔
404
        uvBuf->base = NULL;
×
405
        uvBuf->len = 0;
×
406
        return terrno;
×
407
      }
408
      uvBuf->base = p->buf + p->len;
34,505,584✔
409
      uvBuf->len = p->left;
34,505,584✔
410
    }
411
  }
412
  return 0;
2,147,483,647✔
413
}
414
// check whether already read complete
415
bool transReadComplete(SConnBuffer* connBuf) {
2,147,483,647✔
416
  SConnBuffer* p = connBuf;
2,147,483,647✔
417
  if (p->len >= sizeof(STransMsgHead)) {
2,147,483,647✔
418
    if (p->left == -1) {
2,147,483,647✔
419
      STransMsgHead head;
2,147,483,647✔
420
      memcpy((char*)&head, connBuf->buf, sizeof(head));
2,147,483,647✔
421
      int32_t msgLen = (int32_t)ntohl(head.msgLen);
2,147,483,647✔
422
      p->total = msgLen;
2,147,483,647✔
423
      p->invalid = (head.version != TRANS_VER || msgLen >= TRANS_MSG_LIMIT);
2,147,483,647✔
424
      if (p->invalid) {
2,147,483,647✔
425
        tError("recv invalid msg, version:%d, expect:%d, msg len %d, limit:%d", head.version, TRANS_VER, msgLen, (int)(TRANS_MSG_LIMIT));
×
426
      }
427
    }
428
    if (p->total >= p->len) {
2,147,483,647✔
429
      p->left = p->total - p->len;
2,147,483,647✔
430
    } else {
431
      p->left = 0;
99,961,417✔
432
    }
433
  }
434
  return (p->left == 0 || p->invalid) ? true : false;
2,147,483,647✔
435
}
436

437
int32_t transConnBufferAppend(SConnBuffer* connBuf, char* buf, int32_t len) {
×
438
  int32_t      code = 0;
×
439
  SConnBuffer* p = connBuf;
×
440
  if (p->len + len > p->cap) {
×
441
    int32_t newCap = p->len + len;
×
442
    char*   newBuf = taosMemoryRealloc(p->buf, newCap);
×
443
    if (newBuf == NULL) {
×
444
      return terrno;
×
445
    }
446
    p->buf = newBuf;
×
447
    p->cap = newCap;
×
448
  }
449

450
  memcpy(p->buf + p->len, buf, len);
×
451
  p->len += len;
×
452
  return code;
×
453
}
454

455
int32_t transSetConnOption(uv_tcp_t* stream, int keepalive) {
89,294,354✔
456
  int32_t ret = 0;
89,294,354✔
457
#if defined(WINDOWS) || defined(DARWIN)
458
#else
459
  ret = uv_tcp_keepalive(stream, 1, keepalive);
89,294,354✔
460
#endif
461
  ret = uv_tcp_nodelay(stream, 1);
89,301,815✔
462
  return ret;
89,289,846✔
463
  // int ret = uv_tcp_keepalive(stream, 5, 60);
464
}
465

466
int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool) {
58,851,478✔
467
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
58,851,478✔
468
  if (pool == NULL) {
58,851,478✔
469
    return terrno;
×
470
    // return NULL;
471
  }
472
  int32_t code = 0;
58,851,478✔
473

474
  pool->nAsync = sz;
58,851,478✔
475
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
58,851,478✔
476
  if (pool->asyncs == NULL) {
58,851,478✔
477
    taosMemoryFree(pool);
×
478
    return terrno;
×
479
  }
480

481
  int i = 0, err = 0;
58,851,478✔
482
  for (i = 0; i < pool->nAsync; i++) {
254,905,737✔
483
    uv_async_t* async = &(pool->asyncs[i]);
196,054,259✔
484

485
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
196,054,259✔
486
    if (item == NULL) {
196,054,259✔
487
      code = terrno;
×
488
      break;
×
489
    }
490
    item->pThrd = arg;
196,054,259✔
491
    QUEUE_INIT(&item->qmsg);
196,054,259✔
492
    code = taosThreadMutexInit(&item->mtx, NULL);
196,054,259✔
493
    if (code) {
196,054,259✔
494
      taosMemoryFree(item);
×
495
      break;
×
496
    }
497

498
    async->data = item;
196,054,259✔
499
    err = uv_async_init(loop, async, cb);
196,054,259✔
500
    if (err != 0) {
196,054,259✔
501
      tError("failed to init async since %s", uv_err_name(err));
×
502
      code = TSDB_CODE_THIRDPARTY_ERROR;
×
503
      break;
×
504
    }
505
  }
506

507
  if (i != pool->nAsync) {
58,851,478✔
508
    transAsyncPoolDestroy(pool);
×
509
    pool = NULL;
×
510
  }
511

512
  *pPool = pool;
58,851,478✔
513
  return 0;
58,851,478✔
514
  // return pool;
515
}
516

517
void transAsyncPoolDestroy(SAsyncPool* pool) {
58,851,278✔
518
  if (pool == NULL) return;
58,851,278✔
519

520
  for (int i = 0; i < pool->nAsync; i++) {
254,905,337✔
521
    uv_async_t* async = &(pool->asyncs[i]);
196,054,059✔
522
    SAsyncItem* item = async->data;
196,054,059✔
523
    if (item == NULL) continue;
196,054,059✔
524

525
    TAOS_UNUSED(taosThreadMutexDestroy(&item->mtx));
196,054,059✔
526
    taosMemoryFree(item);
196,054,059✔
527
  }
528
  taosMemoryFree(pool->asyncs);
58,851,278✔
529
  taosMemoryFree(pool);
58,851,278✔
530
}
531
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
45,792,441✔
532
  for (int i = 0; i < pool->nAsync; i++) {
137,377,323✔
533
    uv_async_t* async = &(pool->asyncs[i]);
91,584,882✔
534
    SAsyncItem* item = async->data;
91,584,882✔
535
    if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
91,584,882✔
536
  }
537
  return true;
45,792,441✔
538
}
539
int transAsyncSend(SAsyncPool* pool, queue* q) {
2,147,483,647✔
540
  if (atomic_load_8(&pool->stop) == 1) {
2,147,483,647✔
541
    return TSDB_CODE_RPC_ASYNC_MODULE_QUIT;
20,272✔
542
  }
543
  int idx = pool->index % pool->nAsync;
2,147,483,647✔
544

545
  // no need mutex here
546
  if (pool->index++ > pool->nAsync * 2000) {
2,147,483,647✔
547
    pool->index = 0;
709,021✔
548
  }
549
  uv_async_t* async = &(pool->asyncs[idx]);
2,147,483,647✔
550
  SAsyncItem* item = async->data;
2,147,483,647✔
551

552
  if (taosThreadMutexLock(&item->mtx) != 0) {
2,147,483,647✔
553
    tError("failed to lock mutex since %s", tstrerror(terrno));
×
554
    return terrno;
×
555
  }
556

557
  QUEUE_PUSH(&item->qmsg, q);
2,147,483,647✔
558
  TAOS_UNUSED(taosThreadMutexUnlock(&item->mtx));
2,147,483,647✔
559

560
  int ret = uv_async_send(async);
2,147,483,647✔
561
  if (ret != 0) {
2,147,483,647✔
562
    tError("failed to send async since %s", uv_err_name(ret));
×
563
    return TSDB_CODE_THIRDPARTY_ERROR;
×
564
  }
565
  return 0;
2,147,483,647✔
566
}
567

568
void transCtxInit(STransCtx* ctx) {
×
569
  // init transCtx
570
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
571
  ctx->brokenVal.val = NULL;
×
572
  ctx->freeFunc = NULL;
×
573
}
×
574
void transCtxCleanup(STransCtx* ctx) {
1,066,770,084✔
575
  if (ctx == NULL || ctx->args == NULL) {
1,066,770,084✔
576
    return;
291,707,161✔
577
  }
578

579
  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
775,150,531✔
580
  while (iter) {
1,550,474,399✔
581
    int32_t* type = taosHashGetKey(iter, NULL);
775,189,846✔
582
    tDebug("free msg type %s dump func", TMSG_INFO(*type));
775,168,833✔
583
    ctx->freeFunc(iter->val);
775,173,965✔
584
    iter = taosHashIterate(ctx->args, iter);
775,229,125✔
585
  }
586
  if (ctx->freeFunc) ctx->freeFunc(ctx->brokenVal.val);
775,284,553✔
587
  taosHashCleanup(ctx->args);
775,267,870✔
588
  ctx->args = NULL;
775,148,156✔
589
}
590

591
void transCtxMerge(STransCtx* dst, STransCtx* src) {
796,142,514✔
592
  if (src->args == NULL || src->freeFunc == NULL) {
796,142,514✔
593
    return;
796,108,370✔
594
  }
595
  SRpcBrokenlinkVal tval = {0};
65,780✔
596
  void (*freeFunc)(const void* arg) = NULL;
65,780✔
597

598
  if (dst->args == NULL) {
65,780✔
599
    dst->args = src->args;
32,859✔
600
    dst->brokenVal = src->brokenVal;
32,859✔
601
    dst->freeFunc = src->freeFunc;
32,859✔
602
    src->args = NULL;
32,859✔
603
    return;
32,859✔
604
  } else {
605
    tval = dst->brokenVal;
32,921✔
606
    freeFunc = dst->freeFunc;
32,921✔
607

608
    dst->brokenVal = src->brokenVal;
32,921✔
609
    dst->freeFunc = src->freeFunc;
32,921✔
610
  }
611

612
  size_t klen = 0;
32,921✔
613
  void*  iter = taosHashIterate(src->args, NULL);
32,921✔
614
  while (iter) {
65,842✔
615
    STransCtxVal* sVal = (STransCtxVal*)iter;
32,921✔
616
    int32_t*      msgType = taosHashGetKey(sVal, &klen);
32,921✔
617

618
    STransCtxVal* dVal = taosHashGet(dst->args, msgType, sizeof(*msgType));
32,921✔
619
    if (dVal != NULL) {
32,921✔
620
      tDebug("free msg type %s dump func", TMSG_INFO(*(int32_t*)msgType));
32,921✔
621
      dst->freeFunc(dVal->val);
32,921✔
622
      dVal->val = NULL;
32,921✔
623

624
      TAOS_UNUSED(taosHashRemove(dst->args, msgType, sizeof(*msgType)));
32,921✔
625
    }
626

627
    int32_t code = taosHashPut(dst->args, msgType, sizeof(*msgType), sVal, sizeof(*sVal));
32,921✔
628
    if (code != 0) {
32,921✔
629
      tError("failed to put val to hash since %s", tstrerror(code));
×
630
      tDebug("put msg type %s dump func", TMSG_INFO(*(int32_t*)msgType));
×
631
      if (src->freeFunc) (src->freeFunc)(sVal->val);
×
632
      sVal->val = NULL;
×
633
    }
634
    iter = taosHashIterate(src->args, iter);
32,921✔
635
  }
636
  if (freeFunc != NULL && tval.val != NULL) {
32,921✔
637
    freeFunc(tval.val);
32,921✔
638
    tval.val = NULL;
32,921✔
639
  }
640

641
  taosHashCleanup(src->args);
32,921✔
642
  src->args = NULL;
32,921✔
643
  src->brokenVal.val = NULL;
32,921✔
644
}
645
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
103,977,144✔
646
  if (ctx->args == NULL) {
103,977,144✔
647
    return NULL;
×
648
  }
649
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
103,982,447✔
650
  if (cVal == NULL) {
104,015,366✔
651
    return NULL;
×
652
  }
653
  void* ret = NULL;
104,015,366✔
654
  TAOS_UNUSED((*cVal->clone)(cVal->val, &ret));
104,019,191✔
655
  return ret;
103,995,492✔
656
}
657
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
×
658
  void* ret = NULL;
×
659
  if (ctx->brokenVal.clone == NULL) {
×
660
    return ret;
×
661
  }
662
  TAOS_UNUSED((*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret));
×
663

664
  *msgType = ctx->brokenVal.msgType;
×
665

666
  return ret;
×
667
}
668

669
int32_t transDoCrc(char* buf, int32_t len) {
2,147,483,647✔
670
  STransMsgHead* pHead = (STransMsgHead*)buf;
2,147,483,647✔
671
  pHead->magicNum = 0;
2,147,483,647✔
672
  uint32_t chechSum = taosCalcChecksum(0, (const uint8_t*)buf, len);
2,147,483,647✔
673
  pHead->magicNum = htonl(chechSum);
2,147,483,647✔
674

675
  return 0;
2,147,483,647✔
676
}
677
int32_t transDoCrcCheck(char* buf, int32_t len) {
2,147,483,647✔
678
  STransMsgHead* pHead = (STransMsgHead*)buf;
2,147,483,647✔
679
  uint32_t       checkSum = ntohl(pHead->magicNum);
2,147,483,647✔
680
  pHead->magicNum = 0;
2,147,483,647✔
681
  if (taosCheckChecksum((const uint8_t*)buf, len, checkSum)) {
2,147,483,647✔
682
    return TSDB_CODE_INVALID_MSG;
×
683
  } else {
684
    return 0;
2,147,483,647✔
685
  }
686
}
687

688
#if 0
689
int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
690
  QUEUE_INIT(&wq->node);
691
  wq->freeFunc = (void (*)(void*))freeFunc;
692
  wq->size = 0;
693
  wq->inited = 1;
694
  return 0;
695
}
696
void transQueuePush(STransQueue* q, void* arg) {
697
  queue* node = arg;
698
  QUEUE_PUSH(&q->node, node);
699
  q->size++;
700
}
701
void* transQueuePop(STransQueue* q) {
702
  if (q->size == 0) return NULL;
703

704
  queue* head = QUEUE_HEAD(&q->node);
705
  QUEUE_REMOVE(head);
706
  q->size--;
707
  return head;
708
}
709
int32_t transQueueSize(STransQueue* q) { return q->size; }
710

711
void* transQueueGet(STransQueue* q, int idx) {
712
  if (q->size == 0) return NULL;
713

714
  while (idx-- > 0) {
715
    queue* node = QUEUE_NEXT(&q->node);
716
    if (node == &q->node) return NULL;
717
  }
718
  return NULL;
719
}
720

721
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size)
722
{
723
  queue* d = dst;
724
  queue* node = QUEUE_NEXT(&q->node);
725
  while (node != &q->node) {
726
    queue* next = QUEUE_NEXT(node);
727
    if (filter && filter(node, arg)) {
728
      QUEUE_REMOVE(node);
729
      q->size--;
730
      QUEUE_PUSH(d, node);
731
      if (--size == 0) {
732
        break;
733
      }
734
    }
735
    node = next;
736
  }
737
}
738

739
void* tranQueueHead(STransQueue* q) {
740
  if (q->size == 0) return NULL;
741

742
  queue* head = QUEUE_HEAD(&q->node);
743
  return head;
744
}
745

746
void* transQueueRm(STransQueue* q, int i) {
747
  // if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
748
  //   return NULL;
749
  // }
750
  // if (i >= taosArrayGetSize(queue->q)) {
751
  //   return NULL;
752
  // }
753
  // void* ptr = taosArrayGetP(queue->q, i);
754
  // taosArrayRemove(queue->q, i);
755
  // return ptr;
756
  return NULL;
757
}
758

759
void transQueueRemove(STransQueue* q, void* e) {
760
  if (q->size == 0) return;
761
  queue* node = e;
762
  QUEUE_REMOVE(node);
763
  q->size--;
764
}
765

766
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
767

768
void transQueueClear(STransQueue* q) {
769
  if (q->inited == 0) return;
770
  while (!QUEUE_IS_EMPTY(&q->node)) {
771
    queue* h = QUEUE_HEAD(&q->node);
772
    QUEUE_REMOVE(h);
773
    if (q->freeFunc != NULL) (q->freeFunc)(h);
774
    q->size--;
775
  }
776
}
777
void transQueueDestroy(STransQueue* q) { transQueueClear(q); }
778
#endif
779

780
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
6,257,885✔
781
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
6,257,885✔
782
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
6,257,885✔
783
  if (arg1->execTime > arg2->execTime) {
6,257,885✔
784
    return 0;
2,808,408✔
785
  } else {
786
    return 1;
3,449,477✔
787
  }
788
}
789

790
static void transDQTimeout(uv_timer_t* timer) {
35,908,909✔
791
  SDelayQueue* queue = timer->data;
35,908,909✔
792
  tTrace("timer %p timeout", timer);
35,909,190✔
793
  uint64_t timeout = 0;
35,909,190✔
794
  int64_t  current = taosGetTimestampMs();
35,906,731✔
795
  do {
33,134,375✔
796
    HeapNode* minNode = heapMin(queue->heap);
69,041,106✔
797
    if (minNode == NULL) break;
69,048,414✔
798
    SDelayTask* task = container_of(minNode, SDelayTask, node);
38,041,169✔
799
    if (task->execTime <= current) {
38,042,900✔
800
      heapRemove(queue->heap, minNode);
33,145,640✔
801
      task->func(task->arg);
33,144,351✔
802
      taosMemoryFree(task);
33,130,255✔
803
      timeout = 0;
33,134,375✔
804
    } else {
805
      timeout = task->execTime - current;
4,893,225✔
806
      break;
4,893,225✔
807
    }
808
  } while (1);
809
  if (timeout != 0) {
35,900,470✔
810
    TAOS_UNUSED(uv_timer_start(queue->timer, transDQTimeout, timeout, 0));
4,893,223✔
811
  }
812
}
35,899,472✔
813
int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
91,584,882✔
814
  int32_t      code = 0;
91,584,882✔
815
  Heap*        heap = NULL;
91,584,882✔
816
  uv_timer_t*  timer = NULL;
91,584,882✔
817
  SDelayQueue* q = NULL;
91,584,882✔
818

819
  timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
91,584,882✔
820
  if (timer == NULL) {
91,584,882✔
821
    return terrno;
×
822
  }
823

824
  heap = heapCreate(timeCompare);
91,584,882✔
825
  if (heap == NULL) {
91,584,882✔
826
    TAOS_CHECK_GOTO(terrno, NULL, _return1);
×
827
  }
828

829
  q = taosMemoryCalloc(1, sizeof(SDelayQueue));
91,584,882✔
830
  if (q == NULL) {
91,584,882✔
831
    TAOS_CHECK_GOTO(terrno, NULL, _return1);
×
832
  }
833
  q->heap = heap;
91,584,882✔
834
  q->timer = timer;
91,584,882✔
835
  q->loop = loop;
91,584,882✔
836
  q->timer->data = q;
91,584,882✔
837

838
  int err = uv_timer_init(loop, timer);
91,584,882✔
839
  if (err != 0) {
91,584,882✔
840
    TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _return1);
×
841
  }
842

843
  *queue = q;
91,584,882✔
844
  return 0;
91,584,882✔
845

846
_return1:
×
847
  taosMemoryFree(timer);
×
848
  heapDestroy(heap);
×
849
  taosMemoryFree(q);
×
850
  return TSDB_CODE_OUT_OF_MEMORY;
×
851
}
852

853
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
91,584,882✔
854
  if (queue == NULL) {
91,584,882✔
855
    return;
×
856
  }
857
  taosMemoryFree(queue->timer);
91,584,882✔
858

859
  while (heapSize(queue->heap) > 0) {
91,656,894✔
860
    HeapNode* minNode = heapMin(queue->heap);
72,012✔
861
    if (minNode == NULL) {
72,012✔
862
      return;
×
863
    }
864
    heapRemove(queue->heap, minNode);
72,012✔
865

866
    SDelayTask* task = container_of(minNode, SDelayTask, node);
72,012✔
867

868
    STaskArg* arg = task->arg;
72,012✔
869
    if (freeFunc) freeFunc(arg);
72,012✔
870
    taosMemoryFree(arg);
72,012✔
871

872
    taosMemoryFree(task);
72,012✔
873
  }
874
  heapDestroy(queue->heap);
91,584,882✔
875
  taosMemoryFree(queue);
91,584,882✔
876
}
877
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
15,044,640✔
878
  TAOS_UNUSED(uv_timer_stop(queue->timer));
15,044,640✔
879

880
  if (heapSize(queue->heap) <= 0) {
15,044,598✔
881
    taosMemoryFree(task->arg);
×
882
    taosMemoryFree(task);
×
883
    return;
×
884
  }
885
  heapRemove(queue->heap, &task->node);
15,044,514✔
886

887
  taosMemoryFree(task->arg);
15,043,746✔
888
  taosMemoryFree(task);
15,044,514✔
889

890
  if (heapSize(queue->heap) != 0) {
15,043,302✔
891
    HeapNode* minNode = heapMin(queue->heap);
572,876✔
892
    if (minNode == NULL) return;
572,876✔
893

894
    uint64_t    now = taosGetTimestampMs();
572,876✔
895
    SDelayTask* task = container_of(minNode, SDelayTask, node);
572,876✔
896
    uint64_t    timeout = now > task->execTime ? now - task->execTime : 0;
572,876✔
897

898
    TAOS_UNUSED(uv_timer_start(queue->timer, transDQTimeout, timeout, 0));
572,876✔
899
  }
900
}
901

902
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
48,268,113✔
903
  uint64_t    now = taosGetTimestampMs();
48,268,383✔
904
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
48,268,383✔
905
  if (task == NULL) {
48,269,109✔
906
    return NULL;
×
907
  }
908

909
  task->func = func;
48,269,109✔
910
  task->arg = arg;
48,269,451✔
911
  task->execTime = now + timeoutMs;
48,268,281✔
912

913
  HeapNode* minNode = heapMin(queue->heap);
48,266,981✔
914
  if (minNode) {
48,268,207✔
915
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
2,706,615✔
916
    if (minTask->execTime < task->execTime) {
2,706,615✔
917
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
1,190,549✔
918
    }
919
  }
920

921
  tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
48,268,207✔
922
  heapInsert(queue->heap, &task->node);
48,268,207✔
923
  TAOS_UNUSED(uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0));
48,264,282✔
924
  return task;
48,267,479✔
925
}
926

927
#if 0
928
void transPrintEpSet(SEpSet* pEpSet) {
929
  if (pEpSet == NULL) {
930
    tTrace("NULL epset");
931
    return;
932
  }
933
  char buf[512] = {0};
934
  int  len = snprintf(buf, sizeof(buf), "epset:{");
935
  for (int i = 0; i < pEpSet->numOfEps; i++) {
936
    if (i == pEpSet->numOfEps - 1) {
937
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
938
    } else {
939
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
940
    }
941
  }
942
  len += snprintf(buf + len, sizeof(buf) - len, "}");
943
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
944
}
945
bool transReqEpsetIsEqual(SReqEpSet* a, SReqEpSet* b) {
946
  if (a == NULL && b == NULL) {
947
    return true;
948
  } else if (a == NULL || b == NULL) {
949
    return false;
950
  }
951

952
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
953
    return false;
954
  }
955
  for (int i = 0; i < a->numOfEps; i++) {
956
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
957
      return false;
958
    }
959
  }
960
  return true;
961
}
962
bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b) {
963
  if (a->numOfEps != b->numOfEps) {
964
    return false;
965
  }
966
  for (int i = 0; i < a->numOfEps; i++) {
967
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
968
      return false;
969
    }
970
  }
971
  return true;
972
}
973
#endif
974

975
static void transInitEnv() {
2,817,698✔
976
  refMgt = transOpenRefMgt(50000, transDestroyExHandle);
2,817,698✔
977
  svrRefMgt = transOpenRefMgt(50000, transDestroyExHandle);
2,817,698✔
978
  instMgt = taosOpenRef(50, rpcCloseImpl);
2,817,698✔
979
  transCacheInit();
2,817,698✔
980

981
  transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg);
2,817,698✔
982
  TAOS_UNUSED(uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"));
2,817,698✔
983

984
  saslLibInit();
2,817,698✔
985
}
2,817,698✔
986
static void transDestroyEnv() {
2,144,280✔
987
  transCloseRefMgt(refMgt);
2,144,280✔
988
  transCloseRefMgt(svrRefMgt);
2,144,280✔
989
  transCloseRefMgt(instMgt);
2,144,280✔
990
  transCloseRefMgt(transSyncMsgMgt);
2,144,280✔
991
}
2,144,280✔
992

993
int32_t transInit() {
6,360,459✔
994
  // init env
995
  int32_t code = taosThreadOnce(&transModuleInit, transInitEnv);
6,360,459✔
996
  if (code != 0) {
6,360,459✔
997
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
998
  }
999
  return code;
6,360,459✔
1000
}
1001

1002
int32_t transGetRefMgt() { return refMgt; }
2,147,483,647✔
1003
int32_t transGetSvrRefMgt() { return svrRefMgt; }
×
1004
int32_t transGetInstMgt() { return instMgt; }
771,203,109✔
1005
int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; }
348,244,258✔
1006

1007
void transCleanup() {
2,144,280✔
1008
  // clean env
1009
  transDestroyEnv();
2,144,280✔
1010
}
2,144,280✔
1011
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
18,694,016✔
1012
  // added into once later
1013
  return taosOpenRef(size, func);
18,694,016✔
1014
}
1015
void transCloseRefMgt(int32_t mgt) {
8,577,120✔
1016
  // close ref
1017
  taosCloseRef(mgt);
8,577,120✔
1018
}
8,577,120✔
1019
int64_t transAddExHandle(int32_t refMgt, void* p) {
1,107,479,442✔
1020
  // acquire extern handle
1021
  return taosAddRef(refMgt, p);
1,107,479,442✔
1022
}
1023
void transRemoveExHandle(int32_t refMgt, int64_t refId) {
1,107,639,109✔
1024
  // acquire extern handle
1025
  int32_t code = taosRemoveRef(refMgt, refId);
1,107,639,109✔
1026
  if (code != 0) {
1,107,913,049✔
1027
    tTrace("failed to remove %" PRId64 " from resetId:%d", refId, refMgt);
17,717✔
1028
  }
1029
}
1,107,913,049✔
1030

1031
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {  // acquire extern handle
2,147,483,647✔
1032
  return (void*)taosAcquireRef(refMgt, refId);
2,147,483,647✔
1033
}
1034

1035
void transReleaseExHandle(int32_t refMgt, int64_t refId) {
2,147,483,647✔
1036
  // release extern handle
1037
  int32_t code = taosReleaseRef(refMgt, refId);
2,147,483,647✔
1038
  if (code != 0) {
2,147,483,647✔
1039
    tTrace("failed to release %" PRId64 " from resetId:%d", refId, refMgt);
1,507,875✔
1040
  }
1041
}
2,147,483,647✔
1042
void transDestroyExHandle(void* handle) {
1,102,676,563✔
1043
  if (handle == NULL) {
1,102,676,563✔
1044
    return;
×
1045
  }
1046
  SExHandle* eh = handle;
1,102,676,563✔
1047
  tDebug("trans destroy sid:%" PRId64 ", memory %p", eh->refId, handle);
1,102,676,563✔
1048
  taosMemoryFree(handle);
1,102,738,064✔
1049
}
1050

1051
void transDestroySyncMsg(void* msg) {
54,663,627✔
1052
  if (msg == NULL) return;
54,663,627✔
1053

1054
  STransSyncMsg* pSyncMsg = msg;
54,663,627✔
1055
  TAOS_UNUSED(tsem2_destroy(pSyncMsg->pSem));
54,663,627✔
1056
  taosMemoryFree(pSyncMsg->pSem);
54,662,087✔
1057
  transFreeMsg(pSyncMsg->pRsp->pCont);
54,663,627✔
1058
  taosMemoryFree(pSyncMsg->pRsp);
54,663,627✔
1059
  taosMemoryFree(pSyncMsg);
54,663,627✔
1060
}
1061

1062
uint32_t subnetIpRang2Int(SIpV4Range* pRange) {
×
1063
  uint32_t ip = pRange->ip;
×
1064
  return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF);
×
1065
}
1066
int32_t subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange) {
×
1067
  if (pRange->mask == 32) {
×
1068
    pUtils->type = 0;
×
1069
    pUtils->address = pRange->ip;
×
1070
    return 0;
×
1071
  }
1072
  pUtils->address = subnetIpRang2Int(pRange);
×
1073

1074
  for (int i = 0; i < pRange->mask; i++) {
×
1075
    pUtils->netmask |= (1 << (31 - i));
×
1076
  }
1077

1078
  pUtils->network = pUtils->address & pUtils->netmask;
×
1079
  pUtils->broadcast = (pUtils->network) | (pUtils->netmask ^ 0xFFFFFFFF);
×
1080
  pUtils->type = (pRange->mask == 32 ? 0 : 1);
×
1081

1082
  return 0;
×
1083
}
1084
int32_t subnetCheckIp(SubnetUtils* pUtils, uint32_t ip) {
×
1085
  // impl later
1086
  if (pUtils == NULL) return false;
×
1087
  if (pUtils->type == 0) {
×
1088
    return pUtils->address == ip;
×
1089
  } else {
1090
    SIpV4Range range = {.ip = ip, .mask = 32};
×
1091

1092
    uint32_t t = subnetIpRang2Int(&range);
×
1093
    return t >= pUtils->network && t <= pUtils->broadcast;
×
1094
  }
1095
}
1096

1097
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf, int32_t cap) {
×
1098
  int32_t len = 0;
×
1099

1100
  struct in_addr addr;
×
1101
  addr.s_addr = pRange->ip;
×
1102

1103
  int32_t err = uv_inet_ntop(AF_INET, &addr, buf, 32);
×
1104
  if (err != 0) {
×
1105
    tError("failed to convert ip to string since %s", uv_strerror(err));
×
1106
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1107
  }
1108

1109
  len = strlen(buf);
×
1110

1111
  if (pRange->mask != 32) {
×
1112
    len += snprintf(buf + len, cap - len, "/%d", pRange->mask);
×
1113
  }
1114
  return len;
×
1115
}
1116

1117
int32_t transUtilSWhiteListToStr(SIpWhiteListDual* pList, char** ppBuf) {
2,292✔
1118
  int32_t code = 0;
2,292✔
1119
  int32_t lino = 0;
2,292✔
1120
  char*   pBuf = NULL;
2,292✔
1121
  int32_t len = 0;
2,292✔
1122
  if (pList->num == 0) {
2,292✔
1123
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _error);
×
1124
  }
1125
  int32_t cap = pList->num * IP_RESERVE_CAP;
2,292✔
1126
  pBuf = taosMemoryCalloc(1, cap);
2,292✔
1127
  if (pBuf == NULL) {
2,292✔
1128
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1129
  }
1130

1131
  for (int i = 0; i < pList->num; i++) {
6,876✔
1132
    SIpRange* pRange = &pList->pIpRanges[i];
4,584✔
1133
    SIpAddr   addr = {0};
4,584✔
1134
    code = tIpUintToStr(pRange, &addr);
4,584✔
1135
    TSDB_CHECK_CODE(code, lino, _error);
4,584✔
1136

1137
    len += snprintf(pBuf + len, cap - (len), "%s,", IP_ADDR_STR(&addr));
4,584✔
1138
  }
1139
  if (len > 0) {
2,292✔
1140
    pBuf[len - 1] = 0;
2,292✔
1141
  }
1142

1143
  *ppBuf = pBuf;
2,292✔
1144
_error:
2,292✔
1145
  if (code != 0) {
2,292✔
1146
    taosMemoryFree(pBuf);
×
1147
    *ppBuf = NULL;
×
1148
  }
1149

1150
  return len;
2,292✔
1151
}
1152

1153
bool transUtilCheckDualIp(SIpRange* range, SIpRange* ip) {
×
1154
  SIpV6Range* p6 = &range->ipV6;
×
1155
  SIpV6Range* pIp = &ip->ipV6;
×
1156

1157
  if (p6->mask == 0) {
×
1158
    return true;
×
1159
  } else if (p6->mask == 128) {
×
1160
    return p6->addr[0] == pIp->addr[0] && p6->addr[1] == pIp->addr[1];
×
1161
  }
1162

1163
  uint64_t maskHigh = 0, maskLow = 0;
×
1164
  if (p6->mask <= 64) {
×
1165
    maskHigh = (0xFFFFFFFFFFFFFFFFULL << (64 - p6->mask));
×
1166
    maskLow = 0;
×
1167
  } else {
1168
    maskHigh = 0xFFFFFFFFFFFFFFFFULL;
×
1169
    maskLow = (0xFFFFFFFFFFFFFFFFULL << (128 - p6->mask));
×
1170
  }
1171

1172
  return ((pIp->addr[0] & maskHigh) == (p6->addr[0] & maskHigh)) &&
×
1173
         ((pIp->addr[1] & maskLow) == (p6->addr[1] & maskLow));
×
1174
}
1175

1176
int32_t initWQ(queue* wq) {
89,316,812✔
1177
  int32_t code = 0;
89,316,812✔
1178
  QUEUE_INIT(wq);
89,316,812✔
1179
  for (int i = 0; i < 4; i++) {
446,570,518✔
1180
    SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
357,249,510✔
1181
    if (w == NULL) {
357,204,499✔
1182
      TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1183
    }
1184
    w->wreq.data = w;
357,204,499✔
1185
    w->arg = NULL;
357,211,219✔
1186
    QUEUE_INIT(&w->node);
357,216,038✔
1187
    QUEUE_PUSH(wq, &w->q);
357,218,527✔
1188
  }
1189
  return 0;
89,321,008✔
1190
_exception:
×
1191
  destroyWQ(wq);
×
1192
  return code;
×
1193
}
1194
void destroyWQ(queue* wq) {
89,318,329✔
1195
  while (!QUEUE_IS_EMPTY(wq)) {
448,309,554✔
1196
    queue* h = QUEUE_HEAD(wq);
358,990,966✔
1197
    QUEUE_REMOVE(h);
358,990,144✔
1198
    SWReqsWrapper* w = QUEUE_DATA(h, SWReqsWrapper, q);
358,993,403✔
1199
    taosMemoryFree(w);
358,970,064✔
1200
  }
1201
}
89,321,084✔
1202

1203
uv_write_t* allocWReqFromWQ(queue* wq, void* arg) {
2,147,483,647✔
1204
  if (!QUEUE_IS_EMPTY(wq)) {
2,147,483,647✔
1205
    queue* node = QUEUE_HEAD(wq);
2,147,483,647✔
1206
    QUEUE_REMOVE(node);
2,147,483,647✔
1207
    SWReqsWrapper* w = QUEUE_DATA(node, SWReqsWrapper, q);
2,147,483,647✔
1208
    w->arg = arg;
2,147,483,647✔
1209
    QUEUE_INIT(&w->node);
2,147,483,647✔
1210

1211
    return &w->wreq;
2,147,483,647✔
1212
  } else {
1213
    SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
1,721,162✔
1214
    if (w == NULL) {
1,721,162✔
1215
      return NULL;
×
1216
    }
1217
    w->wreq.data = w;
1,721,162✔
1218
    w->arg = arg;
1,721,162✔
1219
    QUEUE_INIT(&w->node);
1,721,162✔
1220
    return &w->wreq;
1,721,162✔
1221
  }
1222
}
1223

1224
void freeWReqToWQ(queue* wq, SWReqsWrapper* w) {
2,147,483,647✔
1225
  QUEUE_INIT(&w->node);
2,147,483,647✔
1226
  QUEUE_PUSH(wq, &w->q);
2,147,483,647✔
1227
}
2,147,483,647✔
1228

1229
int32_t transSetReadOption(uv_handle_t* handle) {
2,147,483,647✔
1230
  int32_t code = 0;
2,147,483,647✔
1231
  int32_t fd;
2,147,483,647✔
1232
  int     ret = uv_fileno((uv_handle_t*)handle, &fd);
2,147,483,647✔
1233
  if (ret != 0) {
2,147,483,647✔
1234
    tWarn("failed to get fd since %s", uv_err_name(ret));
×
1235
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1236
  }
1237
  code = taosSetSockOpt2(fd);
2,147,483,647✔
1238
  return code;
2,147,483,647✔
1239
}
1240

1241
int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet) {
2,147,483,647✔
1242
  if (pEpset == NULL) {
2,147,483,647✔
1243
    return TSDB_CODE_INVALID_PARA;
×
1244
  }
1245

1246
  if (pReqEpSet == NULL) {
2,147,483,647✔
1247
    return TSDB_CODE_INVALID_PARA;
×
1248
  }
1249

1250
  int32_t    size = sizeof(SReqEpSet) + sizeof(SEp) * pEpset->numOfEps;
2,147,483,647✔
1251
  SReqEpSet* pReq = (SReqEpSet*)taosMemoryCalloc(1, size);
2,147,483,647✔
1252
  if (pReq == NULL) {
2,147,483,647✔
1253
    return TSDB_CODE_OUT_OF_MEMORY;
×
1254
  }
1255
  memcpy((char*)pReq, (char*)pEpset, size);
2,147,483,647✔
1256
  // clear previous
1257
  taosMemoryFree(*pReqEpSet);
2,147,483,647✔
1258

1259
  if (transValidReqEpset(pReq) != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1260
    taosMemoryFree(pReq);
×
1261
    return TSDB_CODE_INVALID_PARA;
×
1262
  }
1263

1264
  *pReqEpSet = pReq;
2,147,483,647✔
1265
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1266
}
1267

1268
int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet) {
4,434,032✔
1269
  if (pReqEpSet == NULL) {
4,434,032✔
1270
    return TSDB_CODE_INVALID_PARA;
×
1271
  }
1272
  memcpy((char*)pEpSet, (char*)pReqEpSet, sizeof(SReqEpSet) + sizeof(SEp) * pReqEpSet->numOfEps);
4,434,032✔
1273
  return TSDB_CODE_SUCCESS;
4,434,022✔
1274
}
1275

1276
int32_t transValidReqEpset(SReqEpSet* pReqEpSet) {
2,147,483,647✔
1277
  if (pReqEpSet == NULL) {
2,147,483,647✔
1278
    return TSDB_CODE_INVALID_PARA;
×
1279
  }
1280
  if (pReqEpSet->numOfEps == 0 || pReqEpSet->numOfEps > TSDB_MAX_EP_NUM || pReqEpSet->inUse >= TSDB_MAX_EP_NUM) {
2,147,483,647✔
1281
    return TSDB_CODE_INVALID_PARA;
×
1282
  }
1283
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1284
}
1285

1286
#else
1287
#define BUFFER_CAP 4096
1288

1289
typedef struct {
1290
  int32_t      numOfThread;
1291
  STaosQueue** qhandle;
1292
  STaosQset**  qset;
1293
  int64_t      idx;
1294

1295
} MultiThreadQhandle;
1296
typedef struct TThread {
1297
  TdThread thread;
1298
  int      idx;
1299
} TThread;
1300

1301
TdThreadMutex       mutex[2];
1302
MultiThreadQhandle* multiQ[2] = {NULL, NULL};
1303
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
1304

1305
static int32_t refMgt;
1306
static int32_t svrRefMgt;
1307
static int32_t instMgt;
1308
static int32_t transSyncMsgMgt;
1309
TdThreadMutex  mutex[2];
1310

1311
TdThreadMutex tableMutex;
1312
SHashObj*     hashTable = NULL;
1313

1314
void transDestroySyncMsg(void* msg);
1315

1316
int32_t transCompressMsg(char* msg, int32_t len) {
1317
  int32_t        ret = 0;
1318
  int            compHdr = sizeof(STransCompMsg);
1319
  STransMsgHead* pHead = transHeadFromCont(msg);
1320

1321
  char* buf = taosMemoryMalloc(len + compHdr + 8);  // 8 extra bytes
1322
  if (buf == NULL) {
1323
    tWarn("failed to allocate memory for rpc msg compression, contLen:%d", len);
1324
    ret = len;
1325
    return ret;
1326
  }
1327

1328
  int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
1329
  /*
1330
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
1331
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
1332
   */
1333
  if (clen > 0 && clen < len - compHdr) {
1334
    STransCompMsg* pComp = (STransCompMsg*)msg;
1335
    pComp->reserved = 0;
1336
    pComp->contLen = htonl(len);
1337
    memcpy(msg + compHdr, buf, clen);
1338

1339
    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
1340
    ret = clen + compHdr;
1341
    pHead->comp = 1;
1342
  } else {
1343
    ret = len;
1344
    pHead->comp = 0;
1345
  }
1346
  taosMemoryFree(buf);
1347
  return ret;
1348
}
1349
int32_t transDecompressMsg(char** msg, int32_t* len) { return 0; }
1350

1351
void transFreeMsg(void* msg) {
1352
  if (msg == NULL) {
1353
    return;
1354
  }
1355
  tTrace("rpc free cont:%p", (char*)msg - TRANS_MSG_OVERHEAD);
1356
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
1357
}
1358

1359
void transCtxInit(STransCtx* ctx) {
1360
  // init transCtx
1361
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
1362
}
1363
void transCtxCleanup(STransCtx* ctx) {
1364
  if (ctx == NULL || ctx->args == NULL) {
1365
    return;
1366
  }
1367

1368
  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
1369
  while (iter) {
1370
    ctx->freeFunc(iter->val);
1371
    iter = taosHashIterate(ctx->args, iter);
1372
  }
1373
  if (ctx->freeFunc) ctx->freeFunc(ctx->brokenVal.val);
1374
  taosHashCleanup(ctx->args);
1375
  ctx->args = NULL;
1376
}
1377

1378
void transCtxMerge(STransCtx* dst, STransCtx* src) {
1379
  if (src->args == NULL || src->freeFunc == NULL) {
1380
    return;
1381
  }
1382
  if (dst->args == NULL) {
1383
    dst->args = src->args;
1384
    dst->brokenVal = src->brokenVal;
1385
    dst->freeFunc = src->freeFunc;
1386
    src->args = NULL;
1387
    return;
1388
  }
1389
  void*  key = NULL;
1390
  size_t klen = 0;
1391
  void*  iter = taosHashIterate(src->args, NULL);
1392
  while (iter) {
1393
    STransCtxVal* sVal = (STransCtxVal*)iter;
1394
    key = taosHashGetKey(sVal, &klen);
1395

1396
    int32_t code = taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
1397
    if (code != 0) {
1398
      tError("failed to put val to hash, reason:%s", tstrerror(code));
1399
    }
1400
    iter = taosHashIterate(src->args, iter);
1401
  }
1402
  taosHashCleanup(src->args);
1403
}
1404
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
1405
  if (ctx->args == NULL) {
1406
    return NULL;
1407
  }
1408
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
1409
  if (cVal == NULL) {
1410
    return NULL;
1411
  }
1412
  void* ret = NULL;
1413
  TAOS_UNUSED((*cVal->clone)(cVal->val, &ret));
1414
  return ret;
1415
}
1416
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
1417
  void* ret = NULL;
1418
  if (ctx->brokenVal.clone == NULL) {
1419
    return ret;
1420
  }
1421
  TAOS_UNUSED((*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret));
1422

1423
  *msgType = ctx->brokenVal.msgType;
1424

1425
  return ret;
1426
}
1427

1428
bool cliMayGetAhandle(STrans* pTrans, SRpcMsg* pMsg) {
1429
  int64_t  seq = pMsg->info.seq;
1430
  int32_t* msgType = NULL;
1431

1432
  if (pMsg->msgType == TDMT_SCH_TASK_RELEASE || pMsg->msgType == TDMT_SCH_TASK_RELEASE + 1) {
1433
    STransCtx* ctx = taosHashGet(pTrans->sidTable, &pMsg->info.qId, sizeof(pMsg->info.qId));
1434
    transCtxCleanup(ctx);
1435
    taosHashRemove(pTrans->sidTable, &pMsg->info.qId, sizeof(pMsg->info.qId));
1436
    return true;
1437
  }
1438
  taosThreadMutexLock(&pTrans->seqMutex);
1439
  msgType = taosHashGet(pTrans->seqTable, &seq, sizeof(seq));
1440
  taosThreadMutexUnlock(&pTrans->seqMutex);
1441
  if (msgType == NULL) {
1442
    STransCtx* ctx = taosHashGet(pTrans->sidTable, &pMsg->info.qId, sizeof(pMsg->info.qId));
1443
    if (ctx == NULL) {
1444
      return false;
1445
    }
1446
    pMsg->info.ahandle = transCtxDumpVal(ctx, pMsg->msgType);
1447
    tError("failed to find msg type for seq:%" PRId64 ", gen ahandle for type %s", seq, TMSG_INFO(pMsg->msgType));
1448
  } else {
1449
    taosThreadMutexLock(&pTrans->seqMutex);
1450
    taosHashRemove(pTrans->seqTable, &seq, sizeof(seq));
1451
    msgType = taosHashGet(pTrans->seqTable, &seq, sizeof(seq));
1452
    taosThreadMutexUnlock(&pTrans->seqMutex);
1453
  }
1454
  return true;
1455
}
1456

1457
void* processSvrMsg(void* arg) {
1458
  TThread* thread = (TThread*)arg;
1459

1460
  int32_t    idx = thread->idx;
1461
  static int num = 0;
1462
  STaosQall* qall;
1463
  SRpcMsg *  pRpcMsg, rpcMsg;
1464
  int        type;
1465
  SQueueInfo qinfo = {0};
1466

1467
  taosAllocateQall(&qall);
1468

1469
  while (1) {
1470
    int numOfMsgs = taosReadAllQitemsFromQset(multiQ[1]->qset[idx], qall, &qinfo);
1471
    if (numOfMsgs <= 0) break;
1472
    taosResetQitems(qall);
1473
    for (int i = 0; i < numOfMsgs; i++) {
1474
      taosGetQitem(qall, (void**)&pRpcMsg);
1475
      taosThreadMutexLock(&mutex[1]);
1476
      RpcCfp    fp = NULL;
1477
      void*     parent = NULL;
1478
      STraceId* trace = &pRpcMsg->info.traceId;
1479
      tGDebug("taos %s received from taosd", TMSG_INFO(pRpcMsg->msgType));
1480
      STrans* pTrans = NULL;
1481
      transGetCb(pRpcMsg->type, &pTrans);
1482

1483
      taosThreadMutexUnlock(&mutex[1]);
1484

1485
      if (pTrans != NULL) {
1486
        if (cliMayGetAhandle(pTrans, pRpcMsg)) {
1487
          if (pRpcMsg->info.reqWithSem == NULL) {
1488
            (pTrans->cfp)(pTrans->parent, pRpcMsg, NULL);
1489
          } else {
1490
            STransReqWithSem* reqWithSem = pRpcMsg->info.reqWithSem;
1491
            memcpy(&reqWithSem->pMsg, pRpcMsg, sizeof(SRpcMsg));
1492
            tsem_post(reqWithSem->sem);
1493
          }
1494
        } else {
1495
          tDebug("taosd %s received from taosd, ignore", TMSG_INFO(pRpcMsg->msgType));
1496
        }
1497
      }
1498
      taosFreeQitem(pRpcMsg);
1499
    }
1500
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
1501
  }
1502

1503
  taosFreeQall(qall);
1504
  return NULL;
1505
}
1506
void* procClientMsg(void* arg) {
1507
  TThread* thread = (TThread*)arg;
1508

1509
  int32_t    idx = thread->idx;
1510
  static int num = 0;
1511
  STaosQall* qall;
1512
  SRpcMsg *  pRpcMsg, rpcMsg;
1513
  int        type;
1514
  SQueueInfo qinfo = {0};
1515

1516
  taosAllocateQall(&qall);
1517

1518
  while (1) {
1519
    int numOfMsgs = taosReadAllQitemsFromQset(multiQ[0]->qset[idx], qall, &qinfo);
1520
    tDebug("%d msgs are received", numOfMsgs);
1521
    if (numOfMsgs <= 0) break;
1522
    taosResetQitems(qall);
1523
    for (int i = 0; i < numOfMsgs; i++) {
1524
      taosGetQitem(qall, (void**)&pRpcMsg);
1525

1526
      STraceId* trace = &pRpcMsg->info.traceId;
1527
      tDebug("taosc %s received from taosc", TMSG_INFO(pRpcMsg->msgType));
1528
      RpcCfp fp = NULL;
1529
      // void*  parent;
1530
      STrans* pTrans = NULL;
1531
      taosThreadMutexLock(&mutex[1]);
1532
      if ((pRpcMsg->type & TD_ASTRA_DSVR) != 0) {
1533
        transGetCb(TD_ASTRA_DSVR, &pTrans);
1534
      }
1535
      taosThreadMutexUnlock(&mutex[1]);
1536
      if (pTrans->cfp != NULL) {
1537
        (pTrans->cfp)(pTrans->parent, pRpcMsg, NULL);
1538
      } else {
1539
        tError("taosc failed to find callback for msg type:%s", TMSG_INFO(pRpcMsg->msgType));
1540
      }
1541
      taosFreeQitem(pRpcMsg);
1542
    }
1543
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
1544
  }
1545

1546
  taosFreeQall(qall);
1547
  return NULL;
1548
}
1549
static void transInitEnv() {
1550
  refMgt = transOpenRefMgt(50000, transDestroyExHandle);
1551
  svrRefMgt = transOpenRefMgt(50000, transDestroyExHandle);
1552
  instMgt = taosOpenRef(50, rpcCloseImpl);
1553
  transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg);
1554

1555
  taosThreadMutexInit(&tableMutex, NULL);
1556
  hashTable = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
1557

1558
  int32_t numOfAthread = 1;
1559

1560
  multiQ[0] = taosMemoryMalloc(sizeof(MultiThreadQhandle));
1561
  multiQ[0]->numOfThread = numOfAthread;
1562
  multiQ[0]->qhandle = (STaosQueue**)taosMemoryMalloc(sizeof(STaosQueue*) * numOfAthread);
1563
  multiQ[0]->qset = (STaosQset**)taosMemoryMalloc(sizeof(STaosQset*) * numOfAthread);
1564

1565
  taosThreadMutexInit(&mutex[0], NULL);
1566

1567
  for (int i = 0; i < numOfAthread; i++) {
1568
    taosOpenQueue(&(multiQ[0]->qhandle[i]));
1569
    taosOpenQset(&multiQ[0]->qset[i]);
1570
    taosAddIntoQset(multiQ[0]->qset[i], multiQ[0]->qhandle[i], NULL);
1571
  }
1572
  {
1573
    TThread* threads = taosMemoryMalloc(sizeof(TThread) * numOfAthread);
1574
    for (int i = 0; i < numOfAthread; i++) {
1575
      threads[i].idx = i;
1576
      taosThreadCreate(&(threads[i].thread), NULL, procClientMsg, (void*)&threads[i]);
1577
    }
1578
  }
1579

1580
  multiQ[1] = taosMemoryMalloc(sizeof(MultiThreadQhandle));
1581
  multiQ[1]->numOfThread = numOfAthread;
1582
  multiQ[1]->qhandle = (STaosQueue**)taosMemoryMalloc(sizeof(STaosQueue*) * numOfAthread);
1583
  multiQ[1]->qset = (STaosQset**)taosMemoryMalloc(sizeof(STaosQset*) * numOfAthread);
1584
  taosThreadMutexInit(&mutex[1], NULL);
1585

1586
  for (int i = 0; i < numOfAthread; i++) {
1587
    taosOpenQueue(&(multiQ[1]->qhandle[i]));
1588
    taosOpenQset(&multiQ[1]->qset[i]);
1589
    taosAddIntoQset(multiQ[1]->qset[i], multiQ[1]->qhandle[i], NULL);
1590
  }
1591
  {
1592
    TThread* threads = taosMemoryMalloc(sizeof(TThread) * numOfAthread);
1593
    for (int i = 0; i < numOfAthread; i++) {
1594
      threads[i].idx = i;
1595
      taosThreadCreate(&(threads[i].thread), NULL, processSvrMsg, (void*)&threads[i]);
1596
    }
1597
  }
1598
}
1599
static void transDestroyEnv() {
1600
  transCloseRefMgt(refMgt);
1601
  transCloseRefMgt(svrRefMgt);
1602
}
1603

1604
typedef struct {
1605
  void (*fp)(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
1606
  RPC_TYPE type;
1607
  void*    parant;
1608
  STrans*  pTransport;
1609
} FP_TYPE;
1610
int32_t transUpdateCb(RPC_TYPE type, STrans* pTransport) {
1611
  taosThreadMutexLock(&tableMutex);
1612

1613
  FP_TYPE t = {.type = type, .pTransport = pTransport};
1614
  int32_t code = taosHashPut(hashTable, &type, sizeof(type), &t, sizeof(t));
1615
  taosThreadMutexUnlock(&tableMutex);
1616
  return 0;
1617
}
1618
int32_t transGetCb(RPC_TYPE type, STrans** ppTransport) {
1619
  taosThreadMutexLock(&tableMutex);
1620
  void* p = taosHashGet(hashTable, &type, sizeof(type));
1621
  if (p == NULL) {
1622
    taosThreadMutexUnlock(&tableMutex);
1623
    return TSDB_CODE_INVALID_MSG;
1624
  }
1625
  FP_TYPE* t = p;
1626
  *ppTransport = t->pTransport;
1627
  // *fp = t->fp;
1628
  // *arg = t->parant;
1629
  taosThreadMutexUnlock(&tableMutex);
1630
  return 0;
1631
}
1632

1633
int32_t transSendReq(STrans* pTransport, SRpcMsg* pMsg, void* pEpSet) {
1634
  SRpcMsg* pTemp;
1635

1636
  taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0, (void**)&pTemp);
1637
  memcpy(pTemp, pMsg, sizeof(SRpcMsg));
1638

1639
  int64_t cidx = multiQ[0]->idx++;
1640
  int32_t idx = cidx % (multiQ[0]->numOfThread);
1641
  tDebug("taos request is sent , type:%s, contLen:%d, item:%p", TMSG_INFO(pMsg->msgType), pMsg->contLen, pTemp);
1642
  taosWriteQitem(multiQ[0]->qhandle[idx], pTemp);
1643
  return 0;
1644
}
1645
int32_t transSendResp(const SRpcMsg* pMsg) {
1646
  SRpcMsg* pTemp;
1647

1648
  taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0, (void**)&pTemp);
1649
  memcpy(pTemp, pMsg, sizeof(SRpcMsg));
1650

1651
  int64_t cidx = multiQ[1]->idx++;
1652
  int32_t idx = cidx % (multiQ[1]->numOfThread);
1653
  tDebug("taos resp is sent to, type:%s, contLen:%d, item:%p", TMSG_INFO(pMsg->msgType), pMsg->contLen, pTemp);
1654
  taosWriteQitem(multiQ[1]->qhandle[idx], pTemp);
1655
  return 0;
1656
}
1657

1658
int32_t transInit() {
1659
  // init env
1660
  int32_t code = taosThreadOnce(&transModuleInit, transInitEnv);
1661
  if (code != 0) {
1662
    code = TAOS_SYSTEM_ERROR(ERRNO);
1663
  }
1664
  return code;
1665
}
1666

1667
int32_t transGetRefMgt() { return refMgt; }
1668
int32_t transGetSvrRefMgt() { return svrRefMgt; }
1669
int32_t transGetInstMgt() { return instMgt; }
1670
int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; }
1671

1672
void transCleanup() {
1673
  // clean env
1674
  transDestroyEnv();
1675
  return;
1676
}
1677
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
1678
  /// add later
1679
  return taosOpenRef(size, func);
1680
}
1681
void transCloseRefMgt(int32_t mgt) {
1682
  // close ref
1683
  taosCloseRef(mgt);
1684
  return;
1685
}
1686
int64_t transAddExHandle(int32_t refMgt, void* p) {
1687
  return taosAddRef(refMgt, p);
1688
  // acquire extern handle
1689
}
1690
void transRemoveExHandle(int32_t refMgt, int64_t refId) {
1691
  // acquire extern handle
1692
  int32_t code = taosRemoveRef(refMgt, refId);
1693
  return;
1694
}
1695

1696
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
1697
  // acquire extern handle
1698
  return (void*)taosAcquireRef(refMgt, refId);
1699
}
1700

1701
void transReleaseExHandle(int32_t refMgt, int64_t refId) {
1702
  // release extern handle
1703
  int32_t code = taosReleaseRef(refMgt, refId);
1704
  return;
1705
}
1706
void transDestroyExHandle(void* handle) {
1707
  if (handle == NULL) {
1708
    return;
1709
  }
1710
  SExHandle* eh = handle;
1711
  if (!QUEUE_IS_EMPTY(&eh->q)) {
1712
    tDebug("handle %p mem leak", handle);
1713
  }
1714
  tDebug("free exhandle %p", handle);
1715
  taosMemoryFree(handle);
1716
  return;
1717
}
1718

1719
void transDestroySyncMsg(void* msg) {
1720
  if (msg == NULL) return;
1721

1722
  STransSyncMsg* pSyncMsg = msg;
1723
  TAOS_UNUSED(tsem2_destroy(pSyncMsg->pSem));
1724
  taosMemoryFree(pSyncMsg->pSem);
1725
  transFreeMsg(pSyncMsg->pRsp->pCont);
1726
  taosMemoryFree(pSyncMsg->pRsp);
1727
  taosMemoryFree(pSyncMsg);
1728
  return;
1729
}
1730

1731
uint32_t subnetIpRang2Int(SIpV4Range* pRange) { return 0; }
1732
int32_t  subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange) { return 0; }
1733
int32_t  subnetCheckIp(SubnetUtils* pUtils, uint32_t ip) { return 0; }
1734

1735
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return 0; }
1736

1737
int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) { return 0; }
1738

1739
int32_t transInitBuffer(SConnBuffer* buf) {
1740
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
1741
  if (buf->buf == NULL) {
1742
    return terrno;
1743
  }
1744

1745
  buf->cap = BUFFER_CAP;
1746
  buf->left = -1;
1747
  buf->len = 0;
1748
  buf->total = 0;
1749
  buf->invalid = 0;
1750
  return 0;
1751
}
1752
void transDestroyBuffer(SConnBuffer* p) {
1753
  taosMemoryFree(p->buf);
1754
  p->buf = NULL;
1755
}
1756

1757
int32_t transClearBuffer(SConnBuffer* buf) {
1758
  SConnBuffer* p = buf;
1759
  if (p->cap > BUFFER_CAP) {
1760
    p->cap = BUFFER_CAP;
1761
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
1762
    if (p->buf == NULL) {
1763
      return terrno;
1764
    }
1765
  }
1766
  p->left = -1;
1767
  p->len = 0;
1768
  p->total = 0;
1769
  p->invalid = 0;
1770
  return 0;
1771
}
1772

1773
int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf, int32_t* len) {
1774
  static const int HEADSIZE = sizeof(STransMsgHead);
1775
  int32_t          code = 0;
1776
  SConnBuffer*     p = connBuf;
1777
  if (p->left != 0 || p->total <= 0) {
1778
    return TSDB_CODE_INVALID_MSG;
1779
  }
1780
  int total = p->total;
1781
  if (total >= HEADSIZE && !p->invalid) {
1782
    *buf = taosMemoryCalloc(1, total);
1783
    if (*buf == NULL) {
1784
      return terrno;
1785
    }
1786
    memcpy(*buf, p->buf, total);
1787
    if ((code = transResetBuffer(connBuf, resetBuf)) < 0) {
1788
      return code;
1789
    }
1790
  } else {
1791
    total = -1;
1792
    code = TSDB_CODE_INVALID_MSG;
1793
  }
1794
  *len = total;
1795
  return code;
1796
}
1797

1798
int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
1799
  SConnBuffer* p = connBuf;
1800
  if (p->total < p->len) {
1801
    int left = p->len - p->total;
1802
    memmove(p->buf, p->buf + p->total, left);
1803
    p->left = -1;
1804
    p->total = 0;
1805
    p->len = left;
1806
  } else if (p->total == p->len) {
1807
    p->left = -1;
1808
    p->total = 0;
1809
    p->len = 0;
1810
    if (p->cap > BUFFER_CAP) {
1811
      if (resetBuf) {
1812
        p->cap = BUFFER_CAP;
1813
        p->buf = taosMemoryRealloc(p->buf, p->cap);
1814
        if (p->buf == NULL) {
1815
          return terrno;
1816
        }
1817
      }
1818
    }
1819
  } else {
1820
    tError("failed to reset buffer, total:%d, len:%d since %s", p->total, p->len, tstrerror(TSDB_CODE_INVALID_MSG));
1821
    return TSDB_CODE_INVALID_MSG;
1822
  }
1823
  return 0;
1824
}
1825

1826
int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet) {
1827
  if (pEpset == NULL) {
1828
    return TSDB_CODE_INVALID_PARA;
1829
  }
1830

1831
  if (pReqEpSet == NULL) {
1832
    return TSDB_CODE_INVALID_PARA;
1833
  }
1834

1835
  int32_t    size = sizeof(SReqEpSet) + sizeof(SEp) * pEpset->numOfEps;
1836
  SReqEpSet* pReq = (SReqEpSet*)taosMemoryCalloc(1, size);
1837
  if (pReq == NULL) {
1838
    return TSDB_CODE_OUT_OF_MEMORY;
1839
  }
1840
  memcpy((char*)pReq, (char*)pEpset, size);
1841
  // clear previous
1842
  taosMemoryFree(*pReqEpSet);
1843

1844
  if (transValidReqEpset(pReq) != TSDB_CODE_SUCCESS) {
1845
    taosMemoryFree(pReq);
1846
    return TSDB_CODE_INVALID_PARA;
1847
  }
1848

1849
  *pReqEpSet = pReq;
1850
  return TSDB_CODE_SUCCESS;
1851
}
1852

1853
int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet) {
1854
  if (pReqEpSet == NULL) {
1855
    return TSDB_CODE_INVALID_PARA;
1856
  }
1857
  memcpy((char*)pEpSet, (char*)pReqEpSet, sizeof(SReqEpSet) + sizeof(SEp) * pReqEpSet->numOfEps);
1858
  return TSDB_CODE_SUCCESS;
1859
}
1860

1861
int32_t transValidReqEpset(SReqEpSet* pReqEpSet) {
1862
  if (pReqEpSet == NULL) {
1863
    return TSDB_CODE_INVALID_PARA;
1864
  }
1865
  if (pReqEpSet->numOfEps == 0 || pReqEpSet->numOfEps > TSDB_MAX_EP_NUM || pReqEpSet->inUse >= TSDB_MAX_EP_NUM) {
1866
    return TSDB_CODE_INVALID_PARA;
1867
  }
1868
  return TSDB_CODE_SUCCESS;
1869
}
1870

1871
#endif  // TD_ASTRA_RPC
1872

1873
int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
141,854,886✔
1874
  QUEUE_INIT(&wq->node);
141,854,886✔
1875
  wq->freeFunc = (void (*)(void*))freeFunc;
141,851,774✔
1876
  wq->size = 0;
141,859,141✔
1877
  wq->inited = 1;
141,858,284✔
1878
  return 0;
141,845,357✔
1879
}
1880
void transQueuePush(STransQueue* q, void* arg) {
2,147,483,647✔
1881
  queue* node = arg;
2,147,483,647✔
1882
  QUEUE_PUSH(&q->node, node);
2,147,483,647✔
1883
  q->size++;
2,147,483,647✔
1884
}
2,147,483,647✔
1885
void* transQueuePop(STransQueue* q) {
2,147,483,647✔
1886
  if (q->size == 0) return NULL;
2,147,483,647✔
1887

1888
  queue* head = QUEUE_HEAD(&q->node);
2,147,483,647✔
1889
  QUEUE_REMOVE(head);
2,147,483,647✔
1890
  q->size--;
2,147,483,647✔
1891
  return head;
2,147,483,647✔
1892
}
1893
int32_t transQueueSize(STransQueue* q) { return q->size; }
2,147,483,647✔
1894

1895
void* transQueueGet(STransQueue* q, int idx) {
×
1896
  if (q->size == 0) return NULL;
×
1897

1898
  while (idx-- > 0) {
×
1899
    queue* node = QUEUE_NEXT(&q->node);
×
1900
    if (node == &q->node) return NULL;
×
1901
  }
1902
  return NULL;
×
1903
}
1904

1905
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size) {
2,147,483,647✔
1906
  queue* d = dst;
2,147,483,647✔
1907
  queue* node = QUEUE_NEXT(&q->node);
2,147,483,647✔
1908
  while (node != &q->node) {
2,147,483,647✔
1909
    queue* next = QUEUE_NEXT(node);
2,147,483,647✔
1910
    if (filter && filter(node, arg)) {
2,147,483,647✔
1911
      QUEUE_REMOVE(node);
2,147,483,647✔
1912
      q->size--;
2,147,483,647✔
1913
      QUEUE_PUSH(d, node);
2,147,483,647✔
1914
      if (--size == 0) {
2,147,483,647✔
1915
        break;
2,147,483,647✔
1916
      }
1917
    }
1918
    node = next;
2,147,483,647✔
1919
  }
1920
}
2,147,483,647✔
1921

1922
void* tranQueueHead(STransQueue* q) {
×
1923
  if (q->size == 0) return NULL;
×
1924

1925
  queue* head = QUEUE_HEAD(&q->node);
×
1926
  return head;
×
1927
}
1928

1929
void* transQueueRm(STransQueue* q, int i) {
×
1930
  // if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
1931
  //   return NULL;
1932
  // }
1933
  // if (i >= taosArrayGetSize(queue->q)) {
1934
  //   return NULL;
1935
  // }
1936
  // void* ptr = taosArrayGetP(queue->q, i);
1937
  // taosArrayRemove(queue->q, i);
1938
  // return ptr;
1939
  return NULL;
×
1940
}
1941

1942
void transQueueRemove(STransQueue* q, void* e) {
×
1943
  if (q->size == 0) return;
×
1944
  queue* node = e;
×
1945
  QUEUE_REMOVE(node);
×
1946
  q->size--;
×
1947
}
1948

1949
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
2,147,483,647✔
1950

1951
void transQueueClear(STransQueue* q) {
36,762,903✔
1952
  if (q->inited == 0) return;
36,762,903✔
1953
  while (!QUEUE_IS_EMPTY(&q->node)) {
36,763,547✔
1954
    queue* h = QUEUE_HEAD(&q->node);
254✔
1955
    QUEUE_REMOVE(h);
254✔
1956
    if (q->freeFunc != NULL) (q->freeFunc)(h);
254✔
1957
    q->size--;
254✔
1958
  }
1959
}
1960
void transQueueDestroy(STransQueue* q) { transQueueClear(q); }
36,762,903✔
1961

1962
void transPrintEpSet(SEpSet* pEpSet) {
237,002✔
1963
  if (pEpSet == NULL) {
237,002✔
1964
    tTrace("NULL epset");
×
1965
    return;
×
1966
  }
1967
  char buf[512] = {0};
237,002✔
1968
  int  len = snprintf(buf, sizeof(buf), "epset:{");
237,002✔
1969
  for (int i = 0; i < pEpSet->numOfEps; i++) {
929,640✔
1970
    if (i == pEpSet->numOfEps - 1) {
692,638✔
1971
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
237,002✔
1972
    } else {
1973
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
455,636✔
1974
    }
1975
  }
1976
  len += snprintf(buf + len, sizeof(buf) - len, "}");
237,002✔
1977
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
237,002✔
1978
}
1979
bool transReqEpsetIsEqual(SReqEpSet* a, SReqEpSet* b) {
2,147,483,647✔
1980
  if (a == NULL && b == NULL) {
2,147,483,647✔
1981
    return true;
×
1982
  } else if (a == NULL || b == NULL) {
2,147,483,647✔
1983
    return false;
29✔
1984
  }
1985

1986
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
2,147,483,647✔
1987
    return false;
842,493✔
1988
  }
1989
  for (int i = 0; i < a->numOfEps; i++) {
2,147,483,647✔
1990
    int32_t l1 = strlen(a->eps[i].fqdn);
2,147,483,647✔
1991
    int32_t l2 = strlen(b->eps[i].fqdn);
2,147,483,647✔
1992
    if (l1 >= TSDB_FQDN_LEN || l2 >= TSDB_FQDN_LEN) {
2,147,483,647✔
1993
      tWarn("get invalid epset, a:%s, b:%s", a->eps[i].fqdn, b->eps[i].fqdn);
217✔
1994
      return false;
×
1995
    }
1996

1997
    if (l1 != l2 || strncmp(a->eps[i].fqdn, b->eps[i].fqdn, l1) != 0 || a->eps[i].port != b->eps[i].port) {
2,147,483,647✔
1998
      return false;
1,464,128✔
1999
    }
2000
  }
2001
  return true;
2,147,483,647✔
2002
}
2003
bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b) {
17,266,654✔
2004
  if (a->numOfEps != b->numOfEps) {
17,266,654✔
2005
    return false;
77,430✔
2006
  }
2007
  for (int i = 0; i < a->numOfEps; i++) {
64,764,679✔
2008
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
47,618,219✔
2009
      return false;
41,071✔
2010
    }
2011
  }
2012
  return true;
17,148,551✔
2013
}
2014

2015
int32_t transReloadTlsConfig(void* handle, int8_t type) {
×
2016
  int32_t code = 0;
×
2017

2018
   
2019
  if (type == TAOS_CONN_CLIENT) {
×
2020
    code = transReloadClientTlsConfig(handle);
×
2021
  } else if (type == TAOS_CONN_SERVER) {
×
2022
    code = transReloadServerTlsConfig(handle);
×
2023
  } else {
2024
    code = TSDB_CODE_INVALID_PARA;
×
2025
  }
2026
  return code;
×
2027
}
2028
STrans* transInstAcquire(int64_t mgtId, int64_t instId) {
2,147,483,647✔
2029
  STrans* ppInst = NULL;
2,147,483,647✔
2030
  int32_t code = transCacheAcquireById(instId, &ppInst);
2,147,483,647✔
2031
  if (code == TSDB_CODE_SUCCESS) {
2,147,483,647✔
2032
    return ppInst;
2,147,483,647✔
2033
  } else {
2034
    return NULL;
8,274✔
2035
  }
2036
}
2037

2038
void transInstRelease(int64_t instId) { transCacheReleaseByRefId(instId); }
2,147,483,647✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc