• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.06
/source/libs/transport/src/thttp.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#ifndef TD_ASTRA_RPC
17
#define _DEFAULT_SOURCE
18
// clang-format off
19
#include "zlib.h"
20
#include "thttp.h"
21
#include <uv.h>
22
#include "taoserror.h"
23
#include "transComm.h"
24

25
// clang-format on
26

27
#define HTTP_RECV_BUF_SIZE 1024
28

29
static int32_t httpRefMgt = 0;
30
static int32_t FAST_FAILURE_LIMIT = 1;
31

32
static int64_t httpDefaultChanId = -1;
33

34
static int64_t httpSeqNum = 0;
35
static int32_t httpRecvRefMgt = 0;
36

37
typedef struct SHttpModule {
38
  uv_loop_t*  loop;
39
  SAsyncPool* asyncPool;
40
  TdThread    thread;
41
  SHashObj*   connStatusTable;
42
  SHashObj*   connPool;
43
  int8_t      quit;
44
  int16_t     connNum;
45
} SHttpModule;
46

47
typedef struct SHttpMsg {
48
  queue         q;
49
  char*         server;
50
  char*         uri;
51
  int32_t       port;
52
  char*         cont;
53
  int32_t       len;
54
  EHttpCompFlag flag;
55
  int8_t        quit;
56
  int64_t       chanId;
57
  int64_t       seq;
58
  char*         qid;
59

60
  int64_t recvBufRid;
61
} SHttpMsg;
62

63
typedef struct SHttpClient {
64
  uv_connect_t       conn;
65
  uv_tcp_t           tcp;
66
  uv_write_t         req;
67
  uv_buf_t*          wbuf;
68
  char*              rbuf;
69
  char*              addr;
70
  uint16_t           port;
71
  struct sockaddr_in dest;
72
  int64_t            chanId;
73
  int64_t            seq;
74

75
  int64_t recvBufRid;
76
} SHttpClient;
77

78
typedef struct SHttpConnList {
79
  queue q;
80

81
} SHttpConnList;
82

83
typedef struct {
84
  char*    pBuf;
85
  int32_t  nBuf;
86
  SRWLatch latch;
87
  int8_t   inited;
88
} SHttpRecvBuf;
89

90
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
91
static void         transHttpEnvInit();
92

93
static void    httpHandleReq(SHttpMsg* msg);
94
static void    httpHandleQuit(SHttpMsg* msg);
95
static int32_t httpSendQuit(SHttpModule* http, int64_t chanId);
96

97
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
98
                             EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg);
99
static void    httpDestroyMsg(SHttpMsg* msg);
100

101
static bool    httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
102
static void    httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ);
103
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
104
                                      EHttpCompFlag flag);
105
static void    httpModuleDestroy(SHttpModule* http);
106

107
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
108
                                            int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid);
109

110
static int32_t taosSendHttpReportImplByChan2(const char* server, const char* uri, uint16_t port, char* pCont,
111
                                             int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid,
112
                                             int64_t rid);
113
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, const char* qid, char* pHead,
3✔
114
                                   int32_t headLen,
115

116
                                   EHttpCompFlag flag) {
117
  int32_t code = 0;
3✔
118
  int32_t len = 0;
3✔
119
  if (flag == HTTP_FLAT) {
3✔
120
    if (qid == NULL) {
×
121
      len = snprintf(pHead, headLen,
×
122
                     "POST %s HTTP/1.1\n"
123
                     "Host: %s\n"
124
                     "Content-Type: application/json\n"
125
                     "Content-Length: %d\n\n",
126
                     uri, server, contLen);
127
    } else {
128
      len = snprintf(pHead, headLen,
×
129
                     "POST %s HTTP/1.1\n"
130
                     "Host: %s\n"
131
                     "X-QID: %s\n"
132
                     "Content-Type: application/json\n"
133
                     "Content-Length: %d\n\n",
134
                     uri, server, qid, contLen);
135
    }
136
    if (len < 0 || len >= headLen) {
×
137
      code = TSDB_CODE_OUT_OF_RANGE;
×
138
    }
139
  } else if (flag == HTTP_GZIP) {
3✔
140
    if (qid == NULL) {
3✔
141
      len = snprintf(pHead, headLen,
×
142
                     "POST %s HTTP/1.1\n"
143
                     "Host: %s\n"
144
                     "Content-Type: application/json\n"
145
                     "Content-Encoding: gzip\n"
146
                     "Content-Length: %d\n\n",
147
                     uri, server, contLen);
148
    } else {
149
      len = snprintf(pHead, headLen,
3✔
150
                     "POST %s HTTP/1.1\n"
151
                     "Host: %s\n"
152
                     "X-QID: %s\n"
153
                     "Content-Type: application/json\n"
154
                     "Content-Encoding: gzip\n"
155
                     "Content-Length: %d\n\n",
156
                     uri, server, qid, contLen);
157
    }
158
    if (len < 0 || len >= headLen) {
3✔
159
      code = TSDB_CODE_OUT_OF_RANGE;
×
160
    }
161
  } else {
162
    code = TSDB_CODE_INVALID_PARA;
×
163
  }
164
  return code;
3✔
165
}
166

167
static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
3✔
168
  int32_t code = 0;
3✔
169
  int32_t destLen = srcLen;
3✔
170
  void*   pDest = taosMemoryMalloc(destLen);
3✔
171

172
  if (pDest == NULL) {
3✔
173
    code = terrno;
×
174
    goto _OVER;
×
175
  }
176

177
  z_stream gzipStream = {0};
3✔
178
  gzipStream.zalloc = (alloc_func)0;
3✔
179
  gzipStream.zfree = (free_func)0;
3✔
180
  gzipStream.opaque = (voidpf)0;
3✔
181
  if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
3✔
182
    code = TSDB_CODE_OUT_OF_MEMORY;
×
183
    goto _OVER;
×
184
  }
185

186
  gzipStream.next_in = (Bytef*)pSrc;
3✔
187
  gzipStream.avail_in = (uLong)srcLen;
3✔
188
  gzipStream.next_out = (Bytef*)pDest;
3✔
189
  gzipStream.avail_out = (uLong)(destLen);
3✔
190

191
  while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) {
6✔
192
    if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) {
3✔
193
      code = TSDB_CODE_COMPRESS_ERROR;
×
194
      goto _OVER;
×
195
    }
196
  }
197

198
  if (gzipStream.avail_in != 0) {
3✔
199
    code = TSDB_CODE_COMPRESS_ERROR;
×
200
    goto _OVER;
×
201
  }
202

203
  int32_t err = 0;
3✔
204
  while (1) {
205
    if ((err = deflate(&gzipStream, Z_FINISH)) == Z_STREAM_END) {
3✔
206
      break;
3✔
207
    }
208
    if (err != Z_OK) {
×
209
      code = TSDB_CODE_COMPRESS_ERROR;
×
210
      goto _OVER;
×
211
    }
212
  }
213

214
  if (deflateEnd(&gzipStream) != Z_OK) {
3✔
215
    code = TSDB_CODE_COMPRESS_ERROR;
×
216
    goto _OVER;
×
217
  }
218

219
  if (gzipStream.total_out >= srcLen) {
3✔
220
    code = TSDB_CODE_COMPRESS_ERROR;
×
221
    goto _OVER;
×
222
  }
223

224
  code = 0;
3✔
225

226
_OVER:
3✔
227
  if (code == 0) {
3✔
228
    memcpy(pSrc, pDest, gzipStream.total_out);
3✔
229
    code = gzipStream.total_out;
3✔
230
  }
231
  taosMemoryFree(pDest);
3✔
232
  return code;
3✔
233
}
234

235
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
236
  uint32_t ip = 0;
3✔
237
  int32_t  code = taosGetIpv4FromFqdn(server, &ip);
3✔
238
  if (code) {
3✔
239
    tError("http-report failed to resolving domain names %s, reason:%s", server, tstrerror(code));
×
240
    return TSDB_CODE_RPC_FQDN_ERROR;
×
241
  }
242
  char buf[TD_IP_LEN] = {0};
3✔
243
  taosInetNtoa(buf, ip);
3✔
244
  int ret = uv_ip4_addr(buf, port, dest);
3✔
245
  if (ret != 0) {
3✔
246
    tError("http-report failed to get addr, reason:%s", uv_err_name(ret));
×
247
    return TSDB_CODE_THIRDPARTY_ERROR;
×
248
  }
249
  return 0;
3✔
250
}
251

252
static void* httpThread(void* arg) {
1✔
253
  SHttpModule* http = (SHttpModule*)arg;
1✔
254
  setThreadName("http-cli-send-thread");
1✔
255
  TAOS_UNUSED(uv_run(http->loop, UV_RUN_DEFAULT));
1✔
256
  return NULL;
×
257
}
258

259
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
3✔
260
                             EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg) {
261
  int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1);
3✔
262
  if (server == NULL || uri == NULL) {
3✔
263
    tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64, chanId, seqNum);
×
264
    *httpMsg = NULL;
×
265
    return TSDB_CODE_INVALID_PARA;
×
266
  }
267

268
  if (pCont == NULL || contLen == 0) {
3✔
269
    tError("http-report failed to report empty packet, chanId:%" PRId64 ", seq:%" PRId64, chanId, seqNum);
×
270
    *httpMsg = NULL;
×
271
    return TSDB_CODE_INVALID_PARA;
×
272
  }
273

274
  SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
3✔
275
  if (msg == NULL) {
3✔
276
    *httpMsg = NULL;
×
277
    return terrno;
×
278
  }
279

280
  msg->seq = seqNum;
3✔
281
  msg->port = port;
3✔
282
  msg->server = taosStrdup(server);
3✔
283
  msg->uri = taosStrdup(uri);
3✔
284
  msg->cont = taosMemoryMalloc(contLen);
3✔
285
  if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) {
3✔
286
    httpDestroyMsg(msg);
×
287
    *httpMsg = NULL;
×
288
    return terrno;
×
289
  }
290

291
  if (qid != NULL) {
3✔
292
    msg->qid = taosStrdup(qid);
3✔
293
    if (msg->qid == NULL) {
3✔
294
      httpDestroyMsg(msg);
×
295
      *httpMsg = NULL;
×
296
      return terrno;
×
297
    }
298
  } else {
299
    msg->qid = NULL;
×
300
  }
301

302
  memcpy(msg->cont, pCont, contLen);
3✔
303
  msg->len = contLen;
3✔
304
  msg->flag = flag;
3✔
305
  msg->quit = 0;
3✔
306
  msg->chanId = chanId;
3✔
307
  *httpMsg = msg;
3✔
308
  return 0;
3✔
309
}
310
static void httpDestroyMsg(SHttpMsg* msg) {
3✔
311
  if (msg == NULL) return;
3✔
312

313
  taosMemoryFree(msg->server);
×
314
  taosMemoryFree(msg->uri);
×
315
  taosMemoryFree(msg->cont);
×
316
  if (msg->qid != NULL) taosMemoryFree(msg->qid);
×
317
  taosMemoryFree(msg);
×
318
}
319
static void httpDestroyMsgWrapper(void* cont, void* param) {
×
320
  SHttpMsg* pMsg = cont;
×
321
  tWarn("http-report destroy msg, chanId:%" PRId64 ", seq:%" PRId64, pMsg->chanId, pMsg->seq);
×
322
  httpDestroyMsg(pMsg);
×
323
}
×
324

325
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
1✔
326
  SHttpMsg *msg = NULL, *quitMsg = NULL;
1✔
327
  if (atomic_load_8(&http->quit) == 0) {
1✔
328
    return;
1✔
329
  }
330

331
  while (!QUEUE_IS_EMPTY(&item->qmsg)) {
×
332
    queue* h = QUEUE_HEAD(&item->qmsg);
×
333
    QUEUE_REMOVE(h);
×
334
    msg = QUEUE_DATA(h, SHttpMsg, q);
×
335
    if (!msg->quit) {
×
336
      tError("http-report failed to report chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", msg->chanId, msg->seq,
×
337
             tstrerror(TSDB_CODE_HTTP_MODULE_QUIT));
338
      httpDestroyMsg(msg);
×
339
    } else {
340
      quitMsg = msg;
×
341
    }
342
  }
343
  if (quitMsg != NULL) {
×
344
    QUEUE_PUSH(&item->qmsg, &quitMsg->q);
×
345
  }
346
}
347
static void httpTrace(queue* q) {
1✔
348
  if (!(rpcDebugFlag & DEBUG_DEBUG) || (QUEUE_IS_EMPTY(q))) {
1✔
349
    return;
1✔
350
  }
351

352
  int64_t   startSeq = 0, endSeq = 0;
×
353
  SHttpMsg* msg = NULL;
×
354

355
  queue* h = QUEUE_HEAD(q);
×
356
  msg = QUEUE_DATA(h, SHttpMsg, q);
×
357
  startSeq = msg->seq;
×
358

359
  h = QUEUE_TAIL(q);
×
360
  msg = QUEUE_DATA(h, SHttpMsg, q);
×
361
  endSeq = msg->seq;
×
362

363
  tDebug("http-report process msg, start_seq:%" PRId64 ", end_seq:%" PRId64 ", max_seq:%" PRId64, startSeq, endSeq,
×
364
         atomic_load_64(&httpSeqNum) - 1);
365
}
366

367
static void httpAsyncCb(uv_async_t* handle) {
1✔
368
  SAsyncItem*  item = handle->data;
1✔
369
  SHttpModule* http = item->pThrd;
1✔
370

371
  SHttpMsg *msg = NULL, *quitMsg = NULL;
1✔
372
  queue     wq;
373
  QUEUE_INIT(&wq);
1✔
374

375
  static int32_t BATCH_SIZE = 20;
376
  int32_t        count = 0;
1✔
377

378
  if ((taosThreadMutexLock(&item->mtx)) != 0) {
1✔
379
    tError("http-report failed to lock mutex");
×
380
  }
381
  httpMayDiscardMsg(http, item);
1✔
382

383
  while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) {
4✔
384
    queue* h = QUEUE_HEAD(&item->qmsg);
3✔
385
    QUEUE_REMOVE(h);
3✔
386
    QUEUE_PUSH(&wq, h);
3✔
387
  }
388
  if (taosThreadMutexUnlock(&item->mtx) != 0) {
1✔
389
    tError("http-report failed to unlock mutex");
×
390
  }
391

392
  httpTrace(&wq);
1✔
393

394
  while (!QUEUE_IS_EMPTY(&wq)) {
4✔
395
    queue* h = QUEUE_HEAD(&wq);
3✔
396
    QUEUE_REMOVE(h);
3✔
397
    msg = QUEUE_DATA(h, SHttpMsg, q);
3✔
398
    if (msg->quit) {
3✔
399
      quitMsg = msg;
×
400
    } else {
401
      httpHandleReq(msg);
3✔
402
    }
403
  }
404
  if (quitMsg) httpHandleQuit(quitMsg);
1✔
405
}
1✔
406

407
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
408
  taosMemoryFree(cli->wbuf[0].base);
3✔
409
  taosMemoryFree(cli->wbuf[1].base);
3✔
410
  taosMemoryFree(cli->wbuf);
3✔
411
  taosMemoryFree(cli->rbuf);
3✔
412
  taosMemoryFree(cli->addr);
3✔
413
  // taosFreeHttpRecvHandle(cli->recvBufRid);
414

415
  taosMemoryFree(cli);
3✔
416
}
3✔
417

418
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
3✔
419
  SHttpClient* cli = handle->data;
3✔
420

421
  int64_t      chanId = cli->chanId;
3✔
422
  SHttpModule* http = taosAcquireRef(httpRefMgt, cli->chanId);
3✔
423
  if (http != NULL) {
3✔
424
    http->connNum -= 1;
3✔
425
    TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
3✔
426
  }
427

428
  destroyHttpClient(cli);
429
}
3✔
430

431
static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
×
432
  SHttpClient* cli = handle->data;
×
433
  buf->base = cli->rbuf;
×
434
  buf->len = HTTP_RECV_BUF_SIZE;
×
435
}
×
436

437
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
×
438
  STUB_RAND_NETWORK_ERR(nread);
439
  SHttpClient* cli = handle->data;
×
440
  if (nread < 0) {
×
441
    tError("http-report recv error:%s, seq:%" PRId64, uv_strerror(nread), cli->seq);
×
442
  } else {
443
    tTrace("http-report succ to recv %d bytes, seq:%" PRId64, (int32_t)nread, cli->seq);
×
444
    if (cli->recvBufRid > 0) {
×
445
      SHttpRecvBuf* p = taosAcquireRef(httpRecvRefMgt, cli->recvBufRid);
×
446
      if (p != NULL) {
×
447
        taosWLockLatch(&p->latch);
×
448
        if (p->inited != 0) {
×
449
          tDebug("http-report already recv valid data");
×
450
        } else {
451
          p->pBuf = taosMemoryCalloc(1, nread + 1);
×
452
          if (p->pBuf != NULL) {
×
453
            memcpy(p->pBuf, buf->base, nread);
×
454
            p->nBuf = nread;
×
455
            p->inited = 1;
×
456
          } else {
457
            tError("http-report failed to dump recv since %s", tstrerror(terrno));
×
458
          }
459
        }
460
        taosWUnLockLatch(&p->latch);
×
461
        TAOS_UNUSED(taosReleaseRef(httpRecvRefMgt, cli->recvBufRid));
×
462
      } else {
463
        tWarn("http-report failed to acquire recv buf since %s", tstrerror(terrno));
×
464
      }
465
    }
466
  }
467
  if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
×
468
    uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
×
469
  }
470
}
×
471
static void clientSentCb(uv_write_t* req, int32_t status) {
×
472
  STUB_RAND_NETWORK_ERR(status);
473
  SHttpClient* cli = req->data;
×
474
  if (status != 0) {
×
475
    tError("http-report failed to send data, reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64,
×
476
           uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq);
477
    if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
×
478
      uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
×
479
    }
480
    return;
×
481
  } else {
482
    tTrace("http-report succ to send data, chanId:%" PRId64 ", seq:%" PRId64, cli->chanId, cli->seq);
×
483
  }
484

485
  status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
×
486
  if (status != 0) {
×
487
    tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64,
×
488
           uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq);
489
    if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
×
490
      uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
×
491
    }
492
  }
493
}
494
static void clientConnCb(uv_connect_t* req, int32_t status) {
3✔
495
  STUB_RAND_NETWORK_ERR(status);
496
  SHttpClient* cli = req->data;
3✔
497
  int64_t      chanId = cli->chanId;
3✔
498

499
  SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
3✔
500
  if (status != 0) {
3✔
501
    httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
3✔
502
    tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64,
3✔
503
           uv_strerror(status), cli->addr, cli->port, chanId, cli->seq);
504
    if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
3✔
505
      uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
3✔
506
    }
507
    TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
3✔
508
    return;
3✔
509
  }
510
  http->connNum += 1;
×
511

512
  httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1);
×
513

514
  status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
×
515
  if (0 != status) {
×
516
    tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64,
×
517
           uv_strerror(status), cli->addr, cli->port, chanId, cli->seq);
518
    if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
×
519
      uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
×
520
    }
521
  }
522
  TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
×
523
}
524

525
int32_t httpSendQuit(SHttpModule* http, int64_t chanId) {
×
526
  SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
×
527
  if (msg == NULL) {
×
528
    return terrno;
×
529
  }
530
  msg->seq = atomic_fetch_add_64(&httpSeqNum, 1);
×
531
  msg->quit = 1;
×
532
  msg->chanId = chanId;
×
533

534
  int ret = transAsyncSend(http->asyncPool, &(msg->q));
×
535
  if (ret != 0) {
×
536
    taosMemoryFree(msg);
×
537
    return TSDB_CODE_THIRDPARTY_ERROR;
×
538
  }
539

540
  return 0;
×
541
}
542

543
static void httpDestroyClientCb(uv_handle_t* handle) {
×
544
  SHttpClient* http = handle->data;
×
545
  destroyHttpClient(http);
546
}
×
547
static void httpWalkCb(uv_handle_t* handle, void* arg) {
×
548
  // impl later
549
  if (!uv_is_closing(handle)) {
×
550
    uv_handle_type type = uv_handle_get_type(handle);
×
551
    if (uv_handle_get_type(handle) == UV_TCP) {
×
552
      uv_close(handle, httpDestroyClientCb);
×
553
    } else {
554
      uv_close(handle, NULL);
×
555
    }
556
  }
557
  return;
×
558
}
559
static void httpHandleQuit(SHttpMsg* msg) {
×
560
  int64_t seq = msg->seq;
×
561
  int64_t chanId = msg->chanId;
×
562
  taosMemoryFree(msg);
×
563

564
  tDebug("http-report receive quit, chanId:%" PRId64 ", seq:%" PRId64, chanId, seq);
×
565
  SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
×
566
  if (http == NULL) return;
×
567
  uv_walk(http->loop, httpWalkCb, NULL);
×
568
  TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
×
569
}
570

571
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) {
3✔
572
  char buf[256] = {0};
3✔
573
  sprintf(buf, "%s:%d", server, port);
3✔
574

575
  int32_t* failedTime = (int32_t*)taosHashGet(pTable, buf, strlen(buf));
3✔
576
  if (failedTime == NULL) {
3✔
577
    return false;
3✔
578
  }
579

580
  int32_t now = taosGetTimestampSec();
×
581
  if (*failedTime > now - FAST_FAILURE_LIMIT) {
×
582
    tDebug("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf);
×
583
    return true;
×
584
  } else {
585
    return false;
×
586
  }
587
}
588
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ) {
3✔
589
  int32_t code = 0;
3✔
590
  char    buf[256] = {0};
3✔
591
  sprintf(buf, "%s:%d", server, port);
3✔
592

593
  if (succ) {
3✔
594
    TAOS_UNUSED(taosHashRemove(pTable, buf, strlen(buf)));
×
595
  } else {
596
    int32_t st = taosGetTimestampSec();
3✔
597
    if ((code = taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st))) != 0) {
3✔
598
      tError("http-report failed to update conn status, dst:%s, reason:%s", buf, tstrerror(code));
×
599
    }
600
  }
601
  return;
3✔
602
}
603
static void httpHandleReq(SHttpMsg* msg) {
3✔
604
  int64_t chanId = msg->chanId;
3✔
605
  int32_t ignore = false;
3✔
606
  char*   header = NULL;
3✔
607
  int32_t code = 0;
3✔
608

609
  SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
3✔
610
  if (http == NULL) {
3✔
611
    code = terrno;
×
612
    goto END;
×
613
  }
614
  if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) {
3✔
615
    ignore = true;
×
616
    goto END;
×
617
  }
618
  struct sockaddr_in dest = {0};
3✔
619
  if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
6✔
620
    goto END;
×
621
  }
622

623
  if (msg->flag == HTTP_GZIP) {
3✔
624
    int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len);
3✔
625
    if (dstLen > 0) {
3✔
626
      msg->len = dstLen;
3✔
627
    } else {
628
      msg->flag = HTTP_FLAT;
×
629
    }
630
    if (dstLen < 0) {
3✔
631
      code = dstLen;
×
632
      goto END;
×
633
    }
634
  }
635

636
  int32_t cap = 2048;
3✔
637
  header = taosMemoryCalloc(1, cap);
3✔
638
  if (header == NULL) {
3✔
639
    code = terrno;
×
640
    goto END;
×
641
  }
642

643
  int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, msg->qid, header, cap, msg->flag);
3✔
644
  if (headLen < 0) {
3✔
645
    code = headLen;
×
646
    goto END;
×
647
  }
648

649
  uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
3✔
650
  if (wb == NULL) {
3✔
651
    code = terrno;
×
652
    goto END;
×
653
  }
654

655
  wb[0] = uv_buf_init((char*)header, strlen(header));  //  heap var
3✔
656
  wb[1] = uv_buf_init((char*)msg->cont, msg->len);     //  heap var
3✔
657

658
  SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
3✔
659
  if (cli == NULL) {
3✔
660
    taosMemoryFree(wb);
×
661
    code = terrno;
×
662
    goto END;
×
663
  }
664
  cli->seq = msg->seq;
3✔
665
  cli->conn.data = cli;
3✔
666
  cli->tcp.data = cli;
3✔
667
  cli->req.data = cli;
3✔
668
  cli->dest = dest;
3✔
669
  cli->chanId = chanId;
3✔
670
  cli->addr = msg->server;
3✔
671
  cli->port = msg->port;
3✔
672
  cli->recvBufRid = msg->recvBufRid;
3✔
673

674
  if (msg->qid != NULL) taosMemoryFree(msg->qid);
3✔
675
  taosMemoryFree(msg->uri);
3✔
676
  taosMemoryFree(msg);
3✔
677

678
  cli->wbuf = wb;
3✔
679
  cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
3✔
680
  if (cli->rbuf == NULL) {
3✔
681
    tError("http-report failed to alloc read buf, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ",reason:%s", cli->addr,
×
682
           cli->port, chanId, cli->seq, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
683
    destroyHttpClient(cli);
684
    TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
×
685
    return;
3✔
686
  }
687

688
  int err = uv_tcp_init(http->loop, &cli->tcp);
3✔
689
  if (err != 0) {
3✔
690
    tError("http-report failed to init socket handle, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s",
×
691
           cli->addr, cli->port, chanId, cli->seq, uv_strerror(err));
692
    destroyHttpClient(cli);
693
    TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
×
694
    return;
×
695
  }
696

697
  // set up timeout to avoid stuck;
698
  int32_t fd = taosCreateSocketWithTimeout(5000);
3✔
699
  if (fd < 0) {
3✔
700
    tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr,
×
701
           cli->port, chanId, cli->seq, tstrerror(terrno));
702
    destroyHttpClient(cli);
703
    TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
×
704
    return;
×
705
  }
706

707
  int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
3✔
708
  if (ret != 0) {
3✔
709
    tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr,
×
710
           cli->port, chanId, cli->seq, uv_strerror(ret));
711

712
    destroyHttpClient(cli);
713
    TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
×
714
    return;
×
715
  }
716

717
  ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
3✔
718
  if (ret != 0) {
3✔
719
    tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reson:%s",
×
720
           cli->addr, cli->port, chanId, cli->seq, uv_strerror(ret));
721
    httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
×
722
    uv_close((uv_handle_t*)&cli->tcp, httpDestroyClientCb);
×
723
  }
724
  TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
3✔
725
  return;
3✔
726

727
END:
×
728
  if (ignore == false) {
×
729
    tError("http-report failed to report to addr:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 " reason:%s", msg->server,
×
730
           msg->port, chanId, msg->seq, tstrerror(code));
731
  }
732
  httpDestroyMsg(msg);
×
733
  taosMemoryFree(header);
×
734
  TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
×
735
}
736

737
static void httpModuleDestroy(SHttpModule* http) {
×
738
  if (http == NULL) return;
×
739

740
  if (http->asyncPool != NULL) {
×
741
    TRANS_DESTROY_ASYNC_POOL_MSG(http->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL);
×
742
    transAsyncPoolDestroy(http->asyncPool);
×
743
  }
744
  if (http->loop) {
×
745
    TAOS_UNUSED(uv_loop_close(http->loop));
×
746
    taosMemoryFree(http->loop);
×
747
  }
748

749
  taosHashCleanup(http->connStatusTable);
×
750
  // not free http, http freeed by ref
751
}
752

753
void httpModuleDestroy2(SHttpModule* http) {
×
754
  if (http == NULL) return;
×
755
  httpModuleDestroy(http);
×
756
  taosMemoryFree(http);
×
757
}
758

759
static int32_t taosSendHttpReportImplByChan2(const char* server, const char* uri, uint16_t port, char* pCont,
×
760
                                             int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid,
761
                                             int64_t recvBufRid) {
762
  SHttpModule* load = NULL;
×
763
  SHttpMsg*    msg = NULL;
×
764
  int32_t      code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, qid, &msg);
×
765
  if (code != 0) {
×
766
    goto _ERROR;
×
767
  }
768

769
  msg->recvBufRid = recvBufRid;
×
770

771
  load = taosAcquireRef(httpRefMgt, chanId);
×
772
  if (load == NULL) {
×
773
    code = terrno;
×
774
    goto _ERROR;
×
775
  }
776

777
  if (atomic_load_8(&load->quit)) {
×
778
    code = TSDB_CODE_HTTP_MODULE_QUIT;
×
779
    goto _ERROR;
×
780
  }
781
  tDebug("http-report start to report, chanId:%" PRId64 ", seq:%" PRId64, chanId, msg->seq);
×
782

783
  code = transAsyncSend(load->asyncPool, &(msg->q));
×
784
  if (code != 0) {
×
785
    code = TSDB_CODE_HTTP_MODULE_QUIT;
×
786
    goto _ERROR;
×
787
  }
788
  msg = NULL;
×
789

790
_ERROR:
×
791

792
  if (code != 0) {
×
793
    tError("http-report failed to report reason:%s, chanId:%" PRId64 ", seq:%" PRId64, tstrerror(code), chanId,
×
794
           msg->seq);
795
  }
796
  httpDestroyMsg(msg);
×
797
  if (load != NULL) taosReleaseRef(httpRefMgt, chanId);
×
798
  return code;
×
799
}
800
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
3✔
801
                                            int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid) {
802
  SHttpModule* load = NULL;
3✔
803
  SHttpMsg*    msg = NULL;
3✔
804
  int32_t      code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, qid, &msg);
3✔
805
  if (code != 0) {
3✔
806
    goto _ERROR;
×
807
  }
808

809
  load = taosAcquireRef(httpRefMgt, chanId);
3✔
810
  if (load == NULL) {
3✔
811
    code = terrno;
×
812
    goto _ERROR;
×
813
  }
814

815
  if (atomic_load_8(&load->quit)) {
3✔
816
    code = TSDB_CODE_HTTP_MODULE_QUIT;
×
817
    goto _ERROR;
×
818
  }
819
  tDebug("http-report start to report, chanId:%" PRId64 ", seq:%" PRId64, chanId, msg->seq);
3✔
820

821
  code = transAsyncSend(load->asyncPool, &(msg->q));
3✔
822
  if (code != 0) {
3✔
823
    code = TSDB_CODE_HTTP_MODULE_QUIT;
×
824
    goto _ERROR;
×
825
  }
826
  msg = NULL;
3✔
827

828
_ERROR:
3✔
829

830
  if (code != 0) {
3✔
831
    tError("http-report failed to report reason:%s, chanId:%" PRId64 ", seq:%" PRId64, tstrerror(code), chanId,
×
832
           msg->seq);
833
  }
834
  httpDestroyMsg(msg);
3✔
835
  if (load != NULL) taosReleaseRef(httpRefMgt, chanId);
3✔
836
  return code;
3✔
837
}
838

839
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
×
840
                                 EHttpCompFlag flag, int64_t chanId, const char* qid) {
841
  return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId, qid);
×
842
}
843

844
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
×
845
                           EHttpCompFlag flag) {
846
  return taosSendHttpReportWithQID(server, uri, port, pCont, contLen, flag, NULL);
×
847
}
848

849
int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
3✔
850
                                  EHttpCompFlag flag, const char* qid) {
851
  TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit));
3✔
852
  return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid);
3✔
853
}
854

855
int32_t taosSendRecvHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
×
856
                                      EHttpCompFlag flag, const char* qid, int64_t recvBufId) {
857
  TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit));
×
858
  return taosSendHttpReportImplByChan2(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid, recvBufId);
×
859
}
860

861
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
×
862

863
static void transHttpDestroyRecvHandle(void* handle) {
×
864
  SHttpRecvBuf* p = handle;
×
865
  taosMemoryFree(p->pBuf);
×
866
  taosMemoryFree(p);
×
867
}
×
868

869
int64_t transInitHttpChanImpl();
870

871
static void transHttpEnvInit() {
1✔
872
  httpRefMgt = taosOpenRef(64, transHttpDestroyHandle);
1✔
873
  httpDefaultChanId = transInitHttpChanImpl();
1✔
874
  httpSeqNum = 0;
1✔
875
  httpRecvRefMgt = taosOpenRef(8, transHttpDestroyRecvHandle);
1✔
876
}
1✔
877

878
void transHttpEnvDestroy() {
489✔
879
  // remove default chanId
880
  taosDestroyHttpChan(httpDefaultChanId);
489✔
881
  httpDefaultChanId = -1;
489✔
882
  taosCloseRef(httpRecvRefMgt);
489✔
883
}
489✔
884

885
int64_t transInitHttpChanImpl() {
1✔
886
  int32_t      code = 0;
1✔
887
  SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule));
1✔
888
  if (http == NULL) {
1✔
889
    code = terrno;
×
890
    goto _ERROR;
×
891
  }
892

893
  http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1✔
894
  if (http->connStatusTable == NULL) {
1✔
895
    code = terrno;
×
896
    goto _ERROR;
×
897
  }
898

899
  http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
1✔
900
  if (http->loop == NULL) {
1✔
901
    code = terrno;
×
902
    goto _ERROR;
×
903
  }
904

905
  int err = uv_loop_init(http->loop);
1✔
906
  if (err != 0) {
1✔
907
    tError("http-report failed init uv, reason:%s", uv_strerror(err));
×
908
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
909
    goto _ERROR;
×
910
  }
911

912
  code = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb, &http->asyncPool);
1✔
913
  if (code != 0) {
1✔
914
    goto _ERROR;
×
915
  }
916

917
  http->quit = 0;
1✔
918

919
  err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
1✔
920
  if (err != 0) {
1✔
921
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
922
    goto _ERROR;
×
923
  }
924

925
  int64_t ref = taosAddRef(httpRefMgt, http);
1✔
926
  if (ref < 0) {
1✔
927
    goto _ERROR;
×
928
  }
929
  return ref;
1✔
930

931
_ERROR:
×
932
  httpModuleDestroy2(http);
×
933
  return code;
×
934
}
935
int64_t taosInitHttpChan() {
×
936
  TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit));
×
937
  return transInitHttpChanImpl();
×
938
}
939

940
void taosDestroyHttpChan(int64_t chanId) {
489✔
941
  tDebug("http-report send quit, chanId:%" PRId64, chanId);
489✔
942

943
  int          ret = 0;
489✔
944
  SHttpModule* load = taosAcquireRef(httpRefMgt, chanId);
489✔
945
  if (load == NULL) {
489✔
946
    tError("http-report failed to destroy chanId:%" PRId64 ", reason:%s", chanId, tstrerror(terrno));
489✔
947
    ret = terrno;
489✔
948
    return;
489✔
949
  }
950

951
  atomic_store_8(&load->quit, 1);
×
952
  ret = httpSendQuit(load, chanId);
×
953
  if (ret != 0) {
×
954
    tDebug("http-report already destroyed, chanId %" PRId64 ",reason:%s", chanId, tstrerror(ret));
×
955
    TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
×
956
    return;
×
957
  }
958

959
  if (taosThreadJoin(load->thread, NULL) != 0) {
×
960
    tTrace("http-report failed to join thread, chanId %" PRId64, chanId);
×
961
  }
962

963
  httpModuleDestroy(load);
×
964

965
  TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
×
966
  TAOS_UNUSED(taosRemoveRef(httpRefMgt, chanId));
×
967
}
968
static int32_t taosAllocHttpRecvHandle(int64_t* rid) {
×
969
  TAOS_UNUSED(taosThreadOnce(&transHttpInit, transHttpEnvInit));
×
970
  SHttpRecvBuf* p = taosMemoryCalloc(1, sizeof(SHttpRecvBuf));
×
971
  if (p == NULL) {
×
972
    return terrno;
×
973
  }
974
  taosInitRWLatch(&p->latch);
×
975

976
  int64_t id = taosAddRef(httpRecvRefMgt, p);
×
977
  if (taosAcquireRef(httpRecvRefMgt, id) == NULL) {
×
978
    taosMemoryFree(p);
×
979
    return terrno;
×
980
  }
981
  *rid = id;
×
982
  return 0;
×
983
}
984
static void taosFreeHttpRecvHandle(int64_t rid) {
×
985
  if (rid <= 0) {
×
986
    return;
×
987
  }
988
  TAOS_UNUSED(taosReleaseRef(httpRecvRefMgt, rid));
×
989
  TAOS_UNUSED(taosRemoveRef(httpRecvRefMgt, rid));
×
990
}
991
static int32_t taosGetHttpRecvById(int64_t rid, char** pRecv, int32_t* len) {
×
992
  int32_t       code = 0;
×
993
  SHttpRecvBuf* p = taosAcquireRef(httpRecvRefMgt, rid);
×
994
  if (p == NULL) {
×
995
    return TSDB_CODE_INVALID_PARA;
×
996
  }
997
  taosWLockLatch(&p->latch);
×
998
  if (p->inited) {
×
999
    *pRecv = p->pBuf;
×
1000
    *len = p->nBuf;
×
1001
    p->pBuf = 0;
×
1002
    p->nBuf = 0;
×
1003
  } else {
1004
    code = TSDB_CODE_INVALID_PARA;
×
1005
  }
1006
  taosWUnLockLatch(&p->latch);
×
1007
  TAOS_UNUSED(taosReleaseRef(httpRecvRefMgt, rid));
×
1008

1009
  return code;
×
1010
}
1011

1012
int32_t taosTelemetryMgtInit(STelemAddrMgmt* mgt, char* defaultAddr) {
63✔
1013
  tstrncpy(mgt->defaultAddr, defaultAddr, sizeof(mgt->defaultAddr));
63✔
1014
  mgt->cachedAddr[0] = 0;
63✔
1015
  mgt->recvBufRid = 0;
63✔
1016
  return 0;
63✔
1017
}
1018
void taosTelemetryDestroy(STelemAddrMgmt* mgt) {
60✔
1019
  if (mgt == NULL || mgt->recvBufRid <= 0) {
60✔
1020
    return;
60✔
1021
  }
1022
  taosFreeHttpRecvHandle(mgt->recvBufRid);
×
1023
  mgt->recvBufRid = 0;
×
1024
}
1025
// TODO: parse http response head By LIB
1026
static int32_t taosTelemetryGetValueFromHttpResp(const char* response, const char* key, char* val) {
×
1027
  if (key == NULL || val == NULL) {
×
1028
    return TSDB_CODE_INVALID_PARA;
×
1029
  }
1030

1031
  int32_t code = 0, line = 0;
×
1032
  int32_t len = strlen(key);
×
1033
  int32_t cap = len + 16;
×
1034
  // const char* reportUrlKey = "\"report_url\":\"";
1035

1036
  char* buf = taosMemoryCalloc(1, cap);
×
1037
  if (buf == NULL) {
×
1038
    return terrno;
×
1039
  }
1040
  int32_t nBytes = snprintf(buf, cap, "\"%s\":\"", key);
×
1041
  if ((uint32_t)nBytes >= cap) {
×
1042
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end);
×
1043
  }
1044

1045
  char* start = strstr(response, buf);
×
1046
  if (start) {
×
1047
    start += strlen(buf);
×
1048
    char* end = strchr(start, '"');
×
1049
    if (end && end - start > 0) {
×
1050
      tstrncpy(val, start, end - start + 1);
×
1051
    } else {
1052
      TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end);
×
1053
    }
1054
  } else {
1055
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &line, _end);
×
1056
  }
1057
_end:
×
1058
  if (code != 0) {
×
1059
    tError("failed to get value from http response since %s", tstrerror(code));
×
1060
  }
1061
  taosMemoryFree(buf);
×
1062
  return code;
×
1063
}
1064
static int32_t taosGetAddrFromHttpResp(int64_t recvBufRid, char* telemAddr) {
×
1065
  int32_t code = 0;
×
1066
  char*   pRecv = NULL;
×
1067
  int32_t nRecv = 0;
×
1068

1069
  code = taosGetHttpRecvById(recvBufRid, &pRecv, &nRecv);
×
1070
  if (code != 0) {
×
1071
    tError("failed to get http recv buf since %s", tstrerror(code));
×
1072
    return code;
×
1073
  }
1074

1075
  code = taosTelemetryGetValueFromHttpResp(pRecv, "report_url", telemAddr);
×
1076
  taosMemoryFree(pRecv);
×
1077
  return code;
×
1078
}
1079
int32_t taosSendTelemReport(STelemAddrMgmt* mgt, const char* uri, uint16_t port, char* pCont, int32_t contLen,
×
1080
                            EHttpCompFlag flag) {
1081
  int32_t code = 0;
×
1082
  int32_t line = 0;
×
1083
  int32_t sendMsg = 0;
×
1084
  char*   addr = mgt->defaultAddr;
×
1085
  if (mgt->cachedAddr[0] == 0) {
×
1086
    if (mgt->recvBufRid == 0) {
×
1087
      code = taosAllocHttpRecvHandle(&mgt->recvBufRid);
×
1088
      TAOS_CHECK_GOTO(code, &line, _end);
×
1089
      code = taosSendRecvHttpReportWithQID(mgt->defaultAddr, uri, port, pCont, contLen, flag, NULL, mgt->recvBufRid);
×
1090
      TAOS_CHECK_GOTO(code, &line, _end);
×
1091
    } else {
1092
      code = taosGetAddrFromHttpResp(mgt->recvBufRid, mgt->cachedAddr);
×
1093
      if (code != 0) {
×
1094
        tError("failed to get cache addr from http response since %s", tstrerror(code));
×
1095
        addr = mgt->defaultAddr;
×
1096
      } else {
1097
        addr = mgt->cachedAddr;
×
1098
      }
1099

1100
      if (mgt->recvBufRid > 0) {
×
1101
        taosFreeHttpRecvHandle(mgt->recvBufRid);
×
1102
        mgt->recvBufRid = 0;
×
1103
      }
1104
      sendMsg = 1;
×
1105
    }
1106
  } else {
1107
    addr = mgt->cachedAddr;
×
1108
    sendMsg = 1;
×
1109
  }
1110
  if (sendMsg == 1) {
×
1111
    code = taosSendHttpReport(addr, uri, port, pCont, contLen, flag);
×
1112
  }
1113
  tDebug("send telemetry port oldAddr:[%s], newAddr:[%s]", mgt->defaultAddr, mgt->cachedAddr);
×
1114
  return code;
×
1115

1116
_end:
×
1117
  if (code != 0) {
×
1118
    tError("failed to send telemetry since %s, default addr:%s, cachedAddr:%s", tstrerror(code), mgt->defaultAddr,
×
1119
           mgt->cachedAddr);
1120
  }
1121
  return code;
×
1122
}
1123
#else  // TD_ASTRA_RPC
1124

1125
#include "thttp.h"
1126

1127
int32_t taosTelemetryMgtInit(STelemAddrMgmt* mgt, char* defaultAddr) { return 0; }
1128
void    taosTelemetryDestroy(STelemAddrMgmt* mgt) { return; }
1129

1130
// not safe for multi-thread, should be called in the same thread
1131
int32_t taosSendTelemReport(STelemAddrMgmt* mgt, const char* uri, uint16_t port, char* pCont, int32_t contLen,
1132
                            EHttpCompFlag flag) {
1133
  return 0;
1134
}
1135

1136
int32_t taosSendRecvHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
1137
                                      EHttpCompFlag flag, const char* qid, int64_t recvBufId) {
1138
  return 0;
1139
}
1140

1141
int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
1142
                                  EHttpCompFlag flag, const char* qid) {
1143
  return 0;
1144
}
1145
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc