• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4894

22 Dec 2025 09:33AM UTC coverage: 65.72% (+0.2%) from 65.57%
#4894

push

travis-ci

web-flow
Update README.md (#34007)

* Update README.md

* docs: update table of contents and improve installation instructions in README

* docs: adjust words

---------

Signed-off-by: WANG Xu <feici02@outlook.com>
Co-authored-by: WANG Xu <feici02@outlook.com>

184394 of 280577 relevant lines covered (65.72%)

111859687.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.47
/source/client/src/clientEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ttimer.h>
17
#include "cJSON.h"
18
#include "catalog.h"
19
#include "clientInt.h"
20
#include "clientLog.h"
21
#include "clientMonitor.h"
22
#include "functionMgt.h"
23
#include "os.h"
24
#include "osSleep.h"
25
#include "query.h"
26
#include "qworker.h"
27
#include "scheduler.h"
28
#include "tcache.h"
29
#include "tcompare.h"
30
#include "tconv.h"
31
#include "tglobal.h"
32
#include "thttp.h"
33
#include "tmsg.h"
34
#include "tqueue.h"
35
#include "tref.h"
36
#include "trpc.h"
37
#include "tsched.h"
38
#include "ttime.h"
39
#include "tversion.h"
40

41
#include "cus_name.h"
42
#include "clientSession.h"
43

44
#define TSC_VAR_NOT_RELEASE 1
45
#define TSC_VAR_RELEASED    0
46

47
#define ENV_JSON_FALSE_CHECK(c)                     \
48
  do {                                              \
49
    if (!c) {                                       \
50
      tscError("faild to add item to JSON object"); \
51
      code = TSDB_CODE_TSC_FAIL_GENERATE_JSON;      \
52
      goto _end;                                    \
53
    }                                               \
54
  } while (0)
55

56
#define ENV_ERR_RET(c, info)          \
57
  do {                                \
58
    int32_t _code = (c);              \
59
    if (_code != TSDB_CODE_SUCCESS) { \
60
      terrno = _code;                 \
61
      tscInitRes = _code;             \
62
      tscError(info);                 \
63
      return;                         \
64
    }                                 \
65
  } while (0)
66

67
STscDbg   tscDbg = {0};
68
SAppInfo  appInfo;
69
int64_t   lastClusterId = 0;
70
int32_t   clientReqRefPool = -1;
71
int32_t   clientConnRefPool = -1;
72
int32_t   clientStop = -1;
73
SHashObj *pTimezoneMap = NULL;
74

75
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
76
volatile int32_t    tscInitRes = 0;
77

78
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
749,209,187✔
79
  int32_t code = TSDB_CODE_SUCCESS;
749,209,187✔
80
  // connection has been released already, abort creating request.
81
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
749,209,187✔
82
  if (pRequest->self < 0) {
749,217,553✔
83
    tscError("failed to add ref to request");
×
84
    code = terrno;
×
85
    return code;
×
86
  }
87

88
  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
749,210,302✔
89

90
  if (pTscObj->pAppInfo) {
749,217,637✔
91
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
749,216,350✔
92

93
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
749,217,379✔
94
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
749,216,691✔
95
    tscDebug("req:0x%" PRIx64 ", create request from conn:0x%" PRIx64
749,218,769✔
96
             ", current:%d, app current:%d, total:%d, QID:0x%" PRIx64,
97
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
98
  }
99

100
  return code;
749,217,469✔
101
}
102

103
static void concatStrings(SArray *list, char *buf, int size) {
×
104
  int len = 0;
×
105
  for (int i = 0; i < taosArrayGetSize(list); i++) {
×
106
    char *db = taosArrayGet(list, i);
×
107
    if (NULL == db) {
×
108
      tscError("get dbname failed, buf:%s", buf);
×
109
      break;
×
110
    }
111
    char *dot = strchr(db, '.');
×
112
    if (dot != NULL) {
×
113
      db = dot + 1;
×
114
    }
115
    if (i != 0) {
×
116
      (void)strncat(buf, ",", size - 1 - len);
×
117
      len += 1;
×
118
    }
119
    int ret = tsnprintf(buf + len, size - len, "%s", db);
×
120
    if (ret < 0) {
×
121
      tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
×
122
      break;
×
123
    }
124
    len += ret;
×
125
    if (len >= size) {
×
126
      tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
×
127
      break;
×
128
    }
129
  }
130
}
×
131
#ifdef USE_REPORT
132
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration) {
×
133
  cJSON  *json = cJSON_CreateObject();
×
134
  int32_t code = TSDB_CODE_SUCCESS;
×
135
  if (json == NULL) {
×
136
    tscError("failed to create monitor json");
×
137
    return TSDB_CODE_OUT_OF_MEMORY;
×
138
  }
139
  char clusterId[32] = {0};
×
140
  if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0) {
×
141
    tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
×
142
    code = TSDB_CODE_FAILED;
×
143
    goto _end;
×
144
  }
145

146
  char startTs[32] = {0};
×
147
  if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start / 1000) < 0) {
×
148
    tscError("failed to generate startTs:%" PRId64, pRequest->metric.start / 1000);
×
149
    code = TSDB_CODE_FAILED;
×
150
    goto _end;
×
151
  }
152

153
  char requestId[32] = {0};
×
154
  if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0) {
×
155
    tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
×
156
    code = TSDB_CODE_FAILED;
×
157
    goto _end;
×
158
  }
159
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
×
160
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
×
161
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
×
162
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration / 1000)));
×
163
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
×
164
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
×
165
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
×
166
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
×
167
      json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
168
  if (pRequest->sqlstr != NULL &&
×
169
      strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
×
170
    char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
×
171
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
×
172
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
173
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
×
174
  } else {
175
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
176
  }
177

178
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
×
179
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
×
180
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
×
181

182
  char pid[32] = {0};
×
183
  if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0) {
×
184
    tscError("failed to generate pid:%d", appInfo.pid);
×
185
    code = TSDB_CODE_FAILED;
×
186
    goto _end;
×
187
  }
188

189
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
×
190
  if (pRequest->dbList != NULL) {
×
191
    char dbList[1024] = {0};
×
192
    concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
×
193
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
×
194
  } else if (pRequest->pDb != NULL) {
×
195
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
×
196
  } else {
197
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
×
198
  }
199

200
  char *value = cJSON_PrintUnformatted(json);
×
201
  if (value == NULL) {
×
202
    tscError("failed to print json");
×
203
    code = TSDB_CODE_FAILED;
×
204
    goto _end;
×
205
  }
206
  MonitorSlowLogData data = {0};
×
207
  data.clusterId = pTscObj->pAppInfo->clusterId;
×
208
  data.type = SLOW_LOG_WRITE;
×
209
  data.data = value;
×
210
  code = monitorPutData2MonitorQueue(data);
×
211
  if (TSDB_CODE_SUCCESS != code) {
×
212
    taosMemoryFree(value);
×
213
    goto _end;
×
214
  }
215

216
_end:
×
217
  cJSON_Delete(json);
×
218
  return code;
×
219
}
220
#endif
221
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char *exceptDb) {
415,568✔
222
  if (pRequest->pDb != NULL) {
415,568✔
223
    return strcmp(pRequest->pDb, exceptDb) != 0;
295,285✔
224
  }
225

226
  for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
195,335✔
227
    char *db = taosArrayGet(pRequest->dbList, i);
75,052✔
228
    if (NULL == db) {
75,052✔
229
      tscError("get dbname failed, exceptDb:%s", exceptDb);
×
230
      return false;
×
231
    }
232
    char *dot = strchr(db, '.');
75,052✔
233
    if (dot != NULL) {
75,052✔
234
      db = dot + 1;
75,052✔
235
    }
236
    if (strcmp(db, exceptDb) == 0) {
75,052✔
237
      return false;
×
238
    }
239
  }
240
  return true;
120,283✔
241
}
242

243
static void deregisterRequest(SRequestObj *pRequest) {
749,071,793✔
244
  if (pRequest == NULL) {
749,071,793✔
245
    tscError("pRequest == NULL");
×
246
    return;
×
247
  }
248

249
  STscObj            *pTscObj = pRequest->pTscObj;
749,071,793✔
250
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
749,073,589✔
251

252
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
749,073,752✔
253
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
749,077,937✔
254
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
749,077,147✔
255

256
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
749,074,807✔
257
  tscDebug("req:0x%" PRIx64 ", free from conn:0x%" PRIx64 ", QID:0x%" PRIx64
749,074,832✔
258
           ", elapsed:%.2f ms, current:%d, app current:%d",
259
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
260

261
  if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
749,074,832✔
262
    if ((pRequest->pQuery && pRequest->pQuery->pRoot && QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
749,072,406✔
263
         (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
527,508,180✔
264
        QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
273,537,441✔
265
      tscDebug("req:0x%" PRIx64 ", insert duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
528,410,781✔
266
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
267
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
268
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
269
               pRequest->requestId);
270
      (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
528,411,565✔
271
      reqType = SLOW_LOG_TYPE_INSERT;
528,413,957✔
272
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
220,663,473✔
273
      tscDebug("req:0x%" PRIx64 ", query duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
169,836,607✔
274
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
275
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
276
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
277
               pRequest->requestId);
278

279
      (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
169,836,607✔
280
      reqType = SLOW_LOG_TYPE_QUERY;
169,836,607✔
281
    }
282

283
    if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
749,077,430✔
284
      tscError("failed to release allocator");
×
285
    }
286
  }
287

288
#ifdef USE_REPORT
289
  if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
749,072,690✔
290
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
345,301✔
291
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
320,602✔
292
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
24,699✔
293
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
7,844✔
294
    } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
16,855✔
295
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
×
296
    }
297
  }
298

299
  if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
749,490,606✔
300
      checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
415,568✔
301
    (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
415,568✔
302
    if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
415,568✔
303
      taosPrintSlowLog("PID:%d, connId:%u, QID:0x%" PRIx64 ", Start:%" PRId64 "us, Duration:%" PRId64 "us, SQL:%s",
207,524✔
304
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
305
                       pRequest->sqlstr);
306
      if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
207,524✔
307
        slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
×
308
        if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
×
309
          tscError("failed to generate write slow log");
×
310
        }
311
      }
312
    }
313
  }
314
#endif
315

316
  releaseTscObj(pTscObj->id);
749,072,056✔
317
}
318

319
// todo close the transporter properly
320
void closeTransporter(SAppInstInfo *pAppInfo) {
1,803,775✔
321
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
1,803,775✔
322
    return;
×
323
  }
324

325
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
1,803,775✔
326
  rpcClose(pAppInfo->pTransporter);
1,803,775✔
327
}
328

329
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
17,933,598✔
330
  if (NEED_REDIRECT_ERROR(code)) {
17,933,598✔
331
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
14,728,809✔
332
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK ||
14,728,809✔
333
        msgType == TDMT_SCH_TASK_NOTIFY) {
334
      return false;
×
335
    }
336
    return true;
14,728,809✔
337
  } else if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY || code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE ||
3,204,789✔
338
             code == TSDB_CODE_SYN_WRITE_STALL || code == TSDB_CODE_SYN_PROPOSE_NOT_READY ||
3,204,789✔
339
             code == TSDB_CODE_SYN_RESTORING) {
340
    tscDebug("client msg type %s should retry since %s", TMSG_INFO(msgType), tstrerror(code));
×
341
    return true;
×
342
  } else {
343
    return false;
3,204,789✔
344
  }
345
}
346

347
// start timer for particular msgType
348
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
×
349
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
×
350
    return true;
×
351
  }
352
  return false;
×
353
}
354

355
// TODO refactor
356
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
1,803,829✔
357
  SRpcInit rpcInit;
1,784,730✔
358
  (void)memset(&rpcInit, 0, sizeof(rpcInit));
1,803,829✔
359
  rpcInit.localPort = 0;
1,803,829✔
360
  rpcInit.label = "TSC";
1,803,829✔
361
  rpcInit.numOfThreads = tsNumOfRpcThreads;
1,803,829✔
362
  rpcInit.cfp = processMsgFromServer;
1,803,829✔
363
  rpcInit.rfp = clientRpcRfp;
1,803,829✔
364
  rpcInit.sessions = 1024;
1,803,829✔
365
  rpcInit.connType = TAOS_CONN_CLIENT;
1,803,829✔
366
  rpcInit.user = (char *)user;
1,803,829✔
367
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,803,829✔
368
  rpcInit.compressSize = tsCompressMsgSize;
1,803,829✔
369
  rpcInit.dfp = destroyAhandle;
1,803,829✔
370

371
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,803,829✔
372
  rpcInit.retryStepFactor = tsRedirectFactor;
1,803,829✔
373
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,803,829✔
374
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,803,829✔
375

376
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
1,803,829✔
377
  connLimitNum = TMAX(connLimitNum, 10);
1,803,829✔
378
  connLimitNum = TMIN(connLimitNum, 1000);
1,803,829✔
379
  rpcInit.connLimitNum = connLimitNum;
1,803,829✔
380
  rpcInit.shareConnLimit = tsShareConnLimit;
1,803,829✔
381
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,803,829✔
382
  rpcInit.startReadTimer = 1;
1,803,829✔
383
  rpcInit.readTimeout = tsReadTimeout;
1,803,829✔
384
  rpcInit.ipv6 = tsEnableIpv6;
1,803,829✔
385
  rpcInit.enableSSL = tsEnableTLS;
1,803,829✔
386
  rpcInit.enableSasl = tsEnableSasl;
1,803,829✔
387

388
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
1,803,829✔
389
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
1,803,829✔
390
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
1,803,829✔
391
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
1,803,829✔
392
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
1,803,829✔
393

394
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
1,803,829✔
395
  if (TSDB_CODE_SUCCESS != code) {
1,803,829✔
396
    tscError("invalid version string.");
×
397
    return code;
×
398
  }
399

400
  tscInfo("rpc max retry timeout %" PRId64 "", rpcInit.retryMaxTimeout);
1,803,829✔
401
  *pDnodeConn = rpcOpen(&rpcInit);
1,803,829✔
402
  if (*pDnodeConn == NULL) {
1,803,829✔
403
    tscError("failed to init connection to server since %s", tstrerror(terrno));
54✔
404
    code = terrno;
54✔
405
  }
406

407
  return code;
1,803,829✔
408
}
409

410
void destroyAllRequests(SHashObj *pRequests) {
3,178,847✔
411
  void *pIter = taosHashIterate(pRequests, NULL);
3,178,847✔
412
  while (pIter != NULL) {
3,178,847✔
413
    int64_t *rid = pIter;
×
414

415
    SRequestObj *pRequest = acquireRequest(*rid);
×
416
    if (pRequest) {
×
417
      destroyRequest(pRequest);
×
418
      (void)releaseRequest(*rid);  // ignore error
×
419
    }
420

421
    pIter = taosHashIterate(pRequests, pIter);
×
422
  }
423
}
3,178,847✔
424

425
void stopAllRequests(SHashObj *pRequests) {
×
426
  void *pIter = taosHashIterate(pRequests, NULL);
×
427
  while (pIter != NULL) {
×
428
    int64_t *rid = pIter;
×
429

430
    SRequestObj *pRequest = acquireRequest(*rid);
×
431
    if (pRequest) {
×
432
      taos_stop_query(pRequest);
×
433
      (void)releaseRequest(*rid);  // ignore error
×
434
    }
435

436
    pIter = taosHashIterate(pRequests, pIter);
×
437
  }
438
}
×
439

440
void destroyAppInst(void *info) {
1,803,775✔
441
  SAppInstInfo *pAppInfo = *(SAppInstInfo **)info;
1,803,775✔
442
  tscInfo("destroy app inst mgr %p", pAppInfo);
1,803,775✔
443

444
  int32_t code = taosThreadMutexLock(&appInfo.mutex);
1,803,775✔
445
  if (TSDB_CODE_SUCCESS != code) {
1,803,775✔
446
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
447
  }
448

449
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
1,803,775✔
450

451
  code = taosThreadMutexUnlock(&appInfo.mutex);
1,803,775✔
452
  if (TSDB_CODE_SUCCESS != code) {
1,803,775✔
453
    tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
454
  }
455

456
  taosMemoryFreeClear(pAppInfo->instKey);
1,803,775✔
457
  closeTransporter(pAppInfo);
1,803,775✔
458

459
  code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
1,803,775✔
460
  if (TSDB_CODE_SUCCESS != code) {
1,803,775✔
461
    tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
462
  }
463

464
  taosArrayDestroy(pAppInfo->pQnodeList);
1,803,775✔
465
  code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
1,803,775✔
466
  if (TSDB_CODE_SUCCESS != code) {
1,803,775✔
467
    tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
468
  }
469

470
  taosMemoryFree(pAppInfo);
1,803,775✔
471
}
1,803,775✔
472

473
//  tscObj 1--->conn1
474
/// tscObj 2-->conn1
475
//  tscObj 3-->conn1
476

477
void destroyTscObj(void *pObj) {
3,178,847✔
478
  if (NULL == pObj) {
3,178,847✔
479
    return;
×
480
  }
481

482
  STscObj *pTscObj = pObj;
3,178,847✔
483
  int64_t  tscId = pTscObj->id;
3,178,847✔
484
  tscTrace("conn:%" PRIx64 ", begin destroy, p:%p", tscId, pTscObj);
3,178,847✔
485

486
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
3,178,847✔
487
  hbDeregisterConn(pTscObj, connKey);
3,178,847✔
488

489
  destroyAllRequests(pTscObj->pRequests);
3,178,847✔
490
  taosHashCleanup(pTscObj->pRequests);
3,178,847✔
491

492
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
3,178,847✔
493
  tscDebug("conn:0x%" PRIx64 ", p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
3,178,847✔
494
           pTscObj->pAppInfo->numOfConns);
495

496
  // In any cases, we should not free app inst here. Or an race condition rises.
497
  /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
3,178,847✔
498

499
  (void)taosThreadMutexDestroy(&pTscObj->mutex);
3,178,847✔
500
  taosMemoryFree(pTscObj);
3,178,847✔
501

502
  tscTrace("conn:0x%" PRIx64 ", end destroy, p:%p", tscId, pTscObj);
3,178,847✔
503
}
504

505
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
3,338,664✔
506
                     STscObj **pObj) {
507
  *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
3,338,664✔
508
  if (NULL == *pObj) {
3,338,664✔
509
    return terrno;
×
510
  }
511

512
  (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
3,338,664✔
513
  if (NULL == (*pObj)->pRequests) {
3,338,664✔
514
    taosMemoryFree(*pObj);
×
515
    return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
516
  }
517

518
  (*pObj)->connType = connType;
3,338,664✔
519
  (*pObj)->pAppInfo = pAppInfo;
3,338,466✔
520
  (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
3,338,664✔
521
  tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
3,338,664✔
522
  (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
3,338,664✔
523

524
  if (db != NULL) {
3,338,664✔
525
    tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
3,338,409✔
526
  }
527

528
  TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
3,338,862✔
529

530
  int32_t code = TSDB_CODE_SUCCESS;
3,338,664✔
531

532
  (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
3,338,664✔
533
  if ((*pObj)->id < 0) {
3,338,630✔
534
    tscError("failed to add object to clientConnRefPool");
×
535
    code = terrno;
×
536
    taosMemoryFree(*pObj);
×
537
    return code;
×
538
  }
539

540
  (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
3,338,630✔
541

542
  updateConnAccessInfo(&(*pObj)->sessInfo);
3,338,664✔
543
  tscInfo("conn:0x%" PRIx64 ", created, p:%p", (*pObj)->id, *pObj);
3,338,630✔
544
  return code;
3,338,664✔
545
}
546

547
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
882,783,874✔
548

549
void releaseTscObj(int64_t rid) {
878,933,929✔
550
  int32_t code = taosReleaseRef(clientConnRefPool, rid);
878,933,929✔
551
  if (TSDB_CODE_SUCCESS != code) {
878,938,695✔
552
    tscWarn("failed to release TscObj, code:%s", tstrerror(code));
×
553
  }
554
}
878,938,695✔
555

556
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
749,207,928✔
557
  int32_t code = TSDB_CODE_SUCCESS;
749,207,928✔
558
  *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
749,207,928✔
559
  if (NULL == *pRequest) {
749,207,326✔
560
    return terrno;
×
561
  }
562

563
  STscObj *pTscObj = acquireTscObj(connId);
749,207,340✔
564
  if (pTscObj == NULL) {
749,219,202✔
565
    TSC_ERR_JRET(TSDB_CODE_TSC_DISCONNECTED);
×
566
  }
567
  SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
749,219,202✔
568
  if (interParam == NULL) {
749,217,863✔
569
    releaseTscObj(connId);
×
570
    TSC_ERR_JRET(terrno);
×
571
  }
572
  TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
749,217,863✔
573
  interParam->pRequest = *pRequest;
749,217,955✔
574
  (*pRequest)->body.interParam = interParam;
749,214,045✔
575

576
  (*pRequest)->resType = RES_TYPE__QUERY;
749,214,476✔
577
  (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
749,208,968✔
578
  (*pRequest)->metric.start = taosGetTimestampUs();
1,495,016,380✔
579

580
  (*pRequest)->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
749,217,395✔
581
  (*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt;
749,207,769✔
582
  (*pRequest)->type = type;
749,216,942✔
583
  (*pRequest)->allocatorRefId = -1;
749,217,628✔
584

585
  (*pRequest)->pDb = getDbOfConnection(pTscObj);
749,217,443✔
586
  if (NULL == (*pRequest)->pDb) {
749,209,524✔
587
    TSC_ERR_JRET(terrno);
74,782,087✔
588
  }
589
  (*pRequest)->pTscObj = pTscObj;
749,210,171✔
590
  (*pRequest)->inCallback = false;
749,211,553✔
591
  (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
749,214,564✔
592
  if (NULL == (*pRequest)->msgBuf) {
749,206,392✔
593
    code = terrno;
×
594
    goto _return;
×
595
  }
596
  (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
749,213,996✔
597
  TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
749,216,438✔
598
  TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
749,215,479✔
599

600
  return TSDB_CODE_SUCCESS;
749,215,615✔
601
_return:
×
602
  if ((*pRequest)->pTscObj) {
×
603
    doDestroyRequest(*pRequest);
×
604
  } else {
605
    taosMemoryFree(*pRequest);
×
606
  }
607
  return code;
×
608
}
609

610
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
783,230,558✔
611
  taosMemoryFreeClear(pResInfo->pRspMsg);
783,230,558✔
612
  taosMemoryFreeClear(pResInfo->length);
783,232,940✔
613
  taosMemoryFreeClear(pResInfo->row);
783,228,743✔
614
  taosMemoryFreeClear(pResInfo->pCol);
783,229,907✔
615
  taosMemoryFreeClear(pResInfo->fields);
783,230,896✔
616
  taosMemoryFreeClear(pResInfo->userFields);
783,231,056✔
617
  taosMemoryFreeClear(pResInfo->convertJson);
783,227,368✔
618
  taosMemoryFreeClear(pResInfo->decompBuf);
783,226,333✔
619

620
  if (pResInfo->convertBuf != NULL) {
783,228,560✔
621
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
630,714,525✔
622
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
510,240,646✔
623
    }
624
    taosMemoryFreeClear(pResInfo->convertBuf);
120,474,923✔
625
  }
626
}
783,229,469✔
627

628
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
2,147,483,647✔
629

630
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
2,147,483,647✔
631

632
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
749,074,875✔
633

634
/// return the most previous req ref id
635
int64_t removeFromMostPrevReq(SRequestObj *pRequest) {
749,074,889✔
636
  int64_t      mostPrevReqRefId = pRequest->self;
749,074,889✔
637
  SRequestObj *pTmp = pRequest;
749,078,420✔
638
  while (pTmp->relation.prevRefId) {
749,078,420✔
639
    pTmp = acquireRequest(pTmp->relation.prevRefId);
×
640
    if (pTmp) {
×
641
      mostPrevReqRefId = pTmp->self;
×
642
      (void)releaseRequest(mostPrevReqRefId);  // ignore error
×
643
    } else {
644
      break;
×
645
    }
646
  }
647
  (void)removeRequest(mostPrevReqRefId);  // ignore error
749,077,297✔
648
  return mostPrevReqRefId;
749,074,256✔
649
}
650

651
void destroyNextReq(int64_t nextRefId) {
749,065,112✔
652
  if (nextRefId) {
749,065,112✔
653
    SRequestObj *pObj = acquireRequest(nextRefId);
×
654
    if (pObj) {
×
655
      (void)releaseRequest(nextRefId);  // ignore error
×
656
      (void)releaseRequest(nextRefId);  // ignore error
×
657
    }
658
  }
659
}
749,065,112✔
660

661
void destroySubRequests(SRequestObj *pRequest) {
×
662
  int32_t      reqIdx = -1;
×
663
  SRequestObj *pReqList[16] = {NULL};
×
664
  uint64_t     tmpRefId = 0;
×
665

666
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
×
667
    return;
×
668
  }
669

670
  SRequestObj *pTmp = pRequest;
×
671
  while (pTmp->relation.prevRefId) {
×
672
    tmpRefId = pTmp->relation.prevRefId;
×
673
    pTmp = acquireRequest(tmpRefId);
×
674
    if (pTmp) {
×
675
      pReqList[++reqIdx] = pTmp;
×
676
      (void)releaseRequest(tmpRefId);  // ignore error
×
677
    } else {
678
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
679
      break;
×
680
    }
681
  }
682

683
  for (int32_t i = reqIdx; i >= 0; i--) {
×
684
    (void)removeRequest(pReqList[i]->self);  // ignore error
×
685
  }
686

687
  tmpRefId = pRequest->relation.nextRefId;
×
688
  while (tmpRefId) {
×
689
    pTmp = acquireRequest(tmpRefId);
×
690
    if (pTmp) {
×
691
      tmpRefId = pTmp->relation.nextRefId;
×
692
      (void)removeRequest(pTmp->self);   // ignore error
×
693
      (void)releaseRequest(pTmp->self);  // ignore error
×
694
    } else {
695
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
696
      break;
×
697
    }
698
  }
699
}
700

701
void doDestroyRequest(void *p) {
749,074,165✔
702
  if (NULL == p) {
749,074,165✔
703
    return;
×
704
  }
705

706
  SRequestObj *pRequest = (SRequestObj *)p;
749,074,165✔
707

708
  uint64_t reqId = pRequest->requestId;
749,074,165✔
709
  tscTrace("QID:0x%" PRIx64 ", begin destroy request, res:%p", reqId, pRequest);
749,077,004✔
710

711
  int64_t nextReqRefId = pRequest->relation.nextRefId;
749,077,004✔
712

713
  int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
749,076,095✔
714
  if (TSDB_CODE_SUCCESS != code) {
749,077,807✔
715
    tscDebug("failed to remove request from hash since %s", tstrerror(code));
3,338,664✔
716
  }
717
  schedulerFreeJob(&pRequest->body.queryJob, 0);
749,077,807✔
718

719
  destorySqlCallbackWrapper(pRequest->pWrapper);
749,077,848✔
720

721
  taosMemoryFreeClear(pRequest->msgBuf);
749,073,439✔
722

723
  doFreeReqResultInfo(&pRequest->body.resInfo);
749,070,756✔
724
  if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
749,071,331✔
725
    tscError("failed to destroy semaphore");
×
726
  }
727

728
  SSessParam para = {.type = SESSION_MAX_CONCURRENCY, .value = -1};
749,074,029✔
729
  code = tscUpdateSessMgtMetric(pRequest->pTscObj, &para);
749,071,061✔
730

731
  taosArrayDestroy(pRequest->tableList);
749,070,026✔
732
  taosArrayDestroy(pRequest->targetTableList);
749,072,331✔
733
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
749,073,803✔
734

735
  if (pRequest->self) {
749,071,984✔
736
    deregisterRequest(pRequest);
749,071,306✔
737
  }
738

739
  taosMemoryFreeClear(pRequest->pDb);
749,073,259✔
740
  taosArrayDestroy(pRequest->dbList);
749,066,957✔
741
  if (pRequest->body.interParam) {
749,072,763✔
742
    if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
749,077,280✔
743
      tscError("failed to destroy semaphore in pRequest");
×
744
    }
745
  }
746
  taosMemoryFree(pRequest->body.interParam);
749,069,997✔
747

748
  qDestroyQuery(pRequest->pQuery);
749,072,130✔
749
  nodesDestroyAllocator(pRequest->allocatorRefId);
749,065,340✔
750

751
  taosMemoryFreeClear(pRequest->effectiveUser);
749,073,572✔
752
  taosMemoryFreeClear(pRequest->sqlstr);
749,073,058✔
753
  taosMemoryFree(pRequest);
749,065,307✔
754
  tscTrace("QID:0x%" PRIx64 ", end destroy request, res:%p", reqId, pRequest);
749,067,061✔
755
  destroyNextReq(nextReqRefId);
749,067,061✔
756
}
757

758
void destroyRequest(SRequestObj *pRequest) {
749,076,877✔
759
  if (pRequest == NULL) return;
749,076,877✔
760

761
  taos_stop_query(pRequest);
749,076,877✔
762
  (void)removeFromMostPrevReq(pRequest);
749,075,840✔
763
}
764

765
void taosStopQueryImpl(SRequestObj *pRequest) {
749,076,888✔
766
  pRequest->killed = true;
749,076,888✔
767

768
  // It is not a query, no need to stop.
769
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
749,077,784✔
770
    tscDebug("QID:0x%" PRIx64 ", no need to be killed since not query", pRequest->requestId);
102,582,005✔
771
    return;
102,582,355✔
772
  }
773

774
  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
646,494,693✔
775
  tscDebug("QID:0x%" PRIx64 ", killed", pRequest->requestId);
646,494,284✔
776
}
777

778
void stopAllQueries(SRequestObj *pRequest) {
749,073,084✔
779
  int32_t      reqIdx = -1;
749,073,084✔
780
  SRequestObj *pReqList[16] = {NULL};
749,073,084✔
781
  uint64_t     tmpRefId = 0;
749,073,936✔
782

783
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
749,073,936✔
784
    return;
×
785
  }
786

787
  SRequestObj *pTmp = pRequest;
749,079,058✔
788
  while (pTmp->relation.prevRefId) {
749,079,058✔
789
    tmpRefId = pTmp->relation.prevRefId;
×
790
    pTmp = acquireRequest(tmpRefId);
×
791
    if (pTmp) {
×
792
      pReqList[++reqIdx] = pTmp;
×
793
    } else {
794
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
795
      break;
×
796
    }
797
  }
798

799
  for (int32_t i = reqIdx; i >= 0; i--) {
749,075,462✔
800
    taosStopQueryImpl(pReqList[i]);
×
801
    (void)releaseRequest(pReqList[i]->self);  // ignore error
×
802
  }
803

804
  taosStopQueryImpl(pRequest);
749,075,462✔
805

806
  tmpRefId = pRequest->relation.nextRefId;
749,077,958✔
807
  while (tmpRefId) {
749,074,290✔
808
    pTmp = acquireRequest(tmpRefId);
×
809
    if (pTmp) {
×
810
      tmpRefId = pTmp->relation.nextRefId;
×
811
      taosStopQueryImpl(pTmp);
×
812
      (void)releaseRequest(pTmp->self);  // ignore error
×
813
    } else {
814
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
815
      break;
×
816
    }
817
  }
818
}
819
#ifdef USE_REPORT
820
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
×
821

822
static void *tscCrashReportThreadFp(void *param) {
×
823
  int32_t code = 0;
×
824
  setThreadName("client-crashReport");
×
825
  char filepath[PATH_MAX] = {0};
×
826
  (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
×
827
  char     *pMsg = NULL;
×
828
  int64_t   msgLen = 0;
×
829
  TdFilePtr pFile = NULL;
×
830
  bool      truncateFile = false;
×
831
  int32_t   sleepTime = 200;
×
832
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
833
  int32_t   loopTimes = reportPeriodNum;
×
834

835
#ifdef WINDOWS
836
  if (taosCheckCurrentInDll()) {
837
    atexit(crashReportThreadFuncUnexpectedStopped);
838
  }
839
#endif
840

841
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
×
842
    return NULL;
×
843
  }
844
  STelemAddrMgmt mgt;
×
845
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
846
  if (code) {
×
847
    tscError("failed to init telemetry management, code:%s", tstrerror(code));
×
848
    return NULL;
×
849
  }
850

851
  code = initCrashLogWriter();
×
852
  if (code) {
×
853
    tscError("failed to init crash log writer, code:%s", tstrerror(code));
×
854
    return NULL;
×
855
  }
856

857
  while (1) {
858
    checkAndPrepareCrashInfo();
×
859
    if (clientStop > 0 && reportThreadSetQuit()) break;
×
860
    if (loopTimes++ < reportPeriodNum) {
×
861
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
862
      taosMsleep(sleepTime);
×
863
      continue;
×
864
    }
865

866
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
867
    if (pMsg && msgLen > 0) {
×
868
      if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
869
        tscError("failed to send crash report");
×
870
        if (pFile) {
×
871
          taosReleaseCrashLogFile(pFile, false);
×
872
          pFile = NULL;
×
873

874
          taosMsleep(sleepTime);
×
875
          loopTimes = 0;
×
876
          continue;
×
877
        }
878
      } else {
879
        tscInfo("succeed to send crash report");
×
880
        truncateFile = true;
×
881
      }
882
    } else {
883
      tscInfo("no crash info was found");
×
884
    }
885

886
    taosMemoryFree(pMsg);
×
887

888
    if (pMsg && msgLen > 0) {
×
889
      pMsg = NULL;
×
890
      continue;
×
891
    }
892

893
    if (pFile) {
×
894
      taosReleaseCrashLogFile(pFile, truncateFile);
×
895
      pFile = NULL;
×
896
      truncateFile = false;
×
897
    }
898

899
    taosMsleep(sleepTime);
×
900
    loopTimes = 0;
×
901
  }
902
  taosTelemetryDestroy(&mgt);
×
903

904
  clientStop = -2;
×
905
  return NULL;
×
906
}
907

908
int32_t tscCrashReportInit() {
1,741,948✔
909
  if (!tsEnableCrashReport) {
1,741,948✔
910
    return TSDB_CODE_SUCCESS;
1,741,948✔
911
  }
912
  int32_t      code = TSDB_CODE_SUCCESS;
×
913
  TdThreadAttr thAttr;
×
914
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
×
915
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
×
916
  TdThread crashReportThread;
×
917
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
×
918
    tscError("failed to create crashReport thread since %s", strerror(ERRNO));
×
919
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
920
    TSC_ERR_RET(terrno);
×
921
  }
922

923
  (void)taosThreadAttrDestroy(&thAttr);
×
924
_return:
×
925
  if (code) {
×
926
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
927
    TSC_ERR_RET(terrno);
×
928
  }
929

930
  return code;
×
931
}
932

933
void tscStopCrashReport() {
1,741,983✔
934
  if (!tsEnableCrashReport) {
1,741,983✔
935
    return;
1,741,983✔
936
  }
937

938
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
×
939
    tscDebug("crash report thread already stopped");
×
940
    return;
×
941
  }
942

943
  while (atomic_load_32(&clientStop) > 0) {
×
944
    taosMsleep(100);
×
945
  }
946
}
947

948
void taos_write_crashinfo(int signum, void *sigInfo, void *context) {
×
949
  writeCrashLogToFile(signum, sigInfo, CUS_PROMPT, lastClusterId, appInfo.startTime);
×
950
}
×
951
#endif
952

953
#ifdef TAOSD_INTEGRATED
954
typedef struct {
955
  TdThread pid;
956
  int32_t  stat;  // < 0: start failed, 0: init(not start), 1: start successfully
957
} SDaemonObj;
958

959
extern int  dmStartDaemon(int argc, char const *argv[]);
960
extern void dmStopDaemon();
961

962
SDaemonObj daemonObj = {0};
963

964
typedef struct {
965
  int32_t argc;
966
  char  **argv;
967
} SExecArgs;
968

969
static void *dmStartDaemonFunc(void *param) {
970
  int32_t    code = 0;
971
  SExecArgs *pArgs = (SExecArgs *)param;
972
  int32_t    argc = pArgs->argc;
973
  char     **argv = pArgs->argv;
974

975
  code = dmStartDaemon(argc, (const char **)argv);
976
  if (code != 0) {
977
    printf("failed to start taosd since %s\r\n", tstrerror(code));
978
    goto _exit;
979
  }
980

981
_exit:
982
  if (code != 0) {
983
    atomic_store_32(&daemonObj.stat, code);
984
  }
985
  return NULL;
986
}
987

988
static int32_t shellStartDaemon(int argc, char *argv[]) {
989
  int32_t    code = 0, lino = 0;
990
  SExecArgs *pArgs = NULL;
991
  int64_t    startMs = taosGetTimestampMs(), endMs = startMs;
992

993
  TdThreadAttr thAttr;
994
  (void)taosThreadAttrInit(&thAttr);
995
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
996
#ifdef TD_COMPACT_OS
997
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
998
#endif
999
  pArgs = (SExecArgs *)taosMemoryCalloc(1, sizeof(SExecArgs));
1000
  if (pArgs == NULL) {
1001
    code = terrno;
1002
    TAOS_CHECK_EXIT(code);
1003
  }
1004
  pArgs->argc = argc;
1005
  pArgs->argv = argv;
1006

1007
#ifndef TD_AS_LIB
1008
  tsLogEmbedded = 1;
1009
#endif
1010

1011
  TAOS_CHECK_EXIT(taosThreadCreate(&daemonObj.pid, &thAttr, dmStartDaemonFunc, pArgs));
1012

1013
  while (true) {
1014
    if (atomic_load_64(&tsDndStart)) {
1015
      atomic_store_32(&daemonObj.stat, 1);
1016
      break;
1017
    }
1018
    int32_t daemonstat = atomic_load_32(&daemonObj.stat);
1019
    if (daemonstat < 0) {
1020
      code = daemonstat;
1021
      TAOS_CHECK_EXIT(code);
1022
    }
1023

1024
    if (daemonstat > 1) {
1025
      code = TSDB_CODE_APP_ERROR;
1026
      TAOS_CHECK_EXIT(code);
1027
    }
1028
    taosMsleep(1000);
1029
  }
1030

1031
_exit:
1032
  endMs = taosGetTimestampMs();
1033
  (void)taosThreadAttrDestroy(&thAttr);
1034
  taosMemoryFreeClear(pArgs);
1035
  if (code) {
1036
    printf("\r\n The daemon start failed at line %d since %s, cost %" PRIi64 " ms\r\n", lino, tstrerror(code),
1037
           endMs - startMs);
1038
  } else {
1039
    printf("\r\n The daemon started successfully, cost %" PRIi64 " ms\r\n", endMs - startMs);
1040
  }
1041
#ifndef TD_AS_LIB
1042
  tsLogEmbedded = 0;
1043
#endif
1044
  return code;
1045
}
1046

1047
void shellStopDaemon() {
1048
#ifndef TD_AS_LIB
1049
  tsLogEmbedded = 1;
1050
#endif
1051
  dmStopDaemon();
1052
  if (taosCheckPthreadValid(daemonObj.pid)) {
1053
    (void)taosThreadJoin(daemonObj.pid, NULL);
1054
    taosThreadClear(&daemonObj.pid);
1055
  }
1056
}
1057
#endif
1058

1059
void taos_init_imp(void) {
1,741,983✔
1060
#if defined(LINUX)
1061
  if (tscDbg.memEnable) {
1,741,983✔
1062
    int32_t code = taosMemoryDbgInit();
×
1063
    if (code) {
×
1064
      (void)printf("failed to init memory dbg, error:%s\n", tstrerror(code));
×
1065
    } else {
1066
      tsAsyncLog = false;
×
1067
      (void)printf("memory dbg enabled\n");
×
1068
    }
1069
  }
1070
#endif
1071

1072
  // In the APIs of other program language, taos_cleanup is not available yet.
1073
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
1074
  (void)atexit(taos_cleanup);
1,741,983✔
1075
  SET_ERRNO(TSDB_CODE_SUCCESS);
1,741,983✔
1076
  terrno = TSDB_CODE_SUCCESS;
1,741,983✔
1077
  taosSeedRand(taosGetTimestampSec());
1,741,983✔
1078

1079
  appInfo.pid = taosGetPId();
1,741,983✔
1080
  appInfo.startTime = taosGetTimestampMs();
1,741,983✔
1081
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,741,983✔
1082
  appInfo.pInstMapByClusterId =
1,741,983✔
1083
      taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1,741,983✔
1084
  if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
1,741,983✔
1085
    (void)printf("failed to allocate memory when init appInfo\n");
×
1086
    tscInitRes = terrno;
×
1087
    return;
35✔
1088
  }
1089
  taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
1,741,983✔
1090

1091
  const char *logName = CUS_PROMPT "log";
1,741,983✔
1092
  ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
1,741,983✔
1093
  if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
1,741,983✔
1094
    (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(ERRNO), configDir);
35✔
1095
    SET_ERROR_MSG("Create %s failed:%s. configDir=%s", logName, strerror(ERRNO), configDir);
35✔
1096
    tscInitRes = terrno;
35✔
1097
    return;
35✔
1098
  }
1099

1100
#ifdef TAOSD_INTEGRATED
1101
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0), "failed to init cfg");
1102
#else
1103
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
1,741,948✔
1104
#endif
1105

1106
  initQueryModuleMsgHandle();
1,741,948✔
1107
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
1108
  if ((tsCharsetCxt = taosConvInit(tsCharset)) == NULL) {
1,741,948✔
1109
    tscInitRes = terrno;
×
1110
    tscError("failed to init conv");
×
1111
    return;
×
1112
  }
1113
#endif
1114
#if !defined(WINDOWS) && !defined(TD_ASTRA)
1115
  ENV_ERR_RET(tzInit(), "failed to init timezone");
1,741,948✔
1116
#endif
1117
  ENV_ERR_RET(monitorInit(), "failed to init monitor");
1,741,948✔
1118
  ENV_ERR_RET(rpcInit(), "failed to init rpc");
1,741,948✔
1119

1120
  if (InitRegexCache() != 0) {
1,741,948✔
1121
    tscInitRes = terrno;
×
1122
    (void)printf("failed to init regex cache\n");
×
1123
    return;
×
1124
  }
1125

1126
  tscInfo("starting to initialize TAOS driver");
1,741,948✔
1127

1128
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
1,741,948✔
1129
  ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
1,741,948✔
1130
  ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
1,741,948✔
1131
  ENV_ERR_RET(initClientId(), "failed to init clientId");
1,741,948✔
1132

1133
  ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
1,741,948✔
1134
  ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
1,741,948✔
1135
  ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
1,741,948✔
1136

1137
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
1,741,948✔
1138
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
1,741,948✔
1139

1140
  ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
1,741,948✔
1141
  ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
1,741,948✔
1142
#ifdef USE_REPORT
1143
  ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
1,741,948✔
1144
#endif
1145
  ENV_ERR_RET(qInitKeywordsTable(), "failed to init parser keywords table");
1,741,948✔
1146
#ifdef TAOSD_INTEGRATED
1147
  ENV_ERR_RET(shellStartDaemon(0, NULL), "failed to start taosd daemon");
1148
#endif
1149

1150
  if (tsSessionControl) {
1,741,948✔
1151
    ENV_ERR_RET(sessMgtInit(), "failed to init session management");
×
1152
  }
1153

1154
  tscInfo("TAOS driver is initialized successfully");
1,741,948✔
1155
}
1156

1157
int taos_init() {
2,423,510✔
1158
  (void)taosThreadOnce(&tscinit, taos_init_imp);
2,423,510✔
1159
  return tscInitRes;
2,423,510✔
1160
}
1161

1162
const char *getCfgName(TSDB_OPTION option) {
1,737✔
1163
  const char *name = NULL;
1,737✔
1164

1165
  switch (option) {
1,737✔
1166
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
588✔
1167
      name = "shellActivityTimer";
588✔
1168
      break;
588✔
1169
    case TSDB_OPTION_LOCALE:
×
1170
      name = "locale";
×
1171
      break;
×
1172
    case TSDB_OPTION_CHARSET:
×
1173
      name = "charset";
×
1174
      break;
×
1175
    case TSDB_OPTION_TIMEZONE:
1,149✔
1176
      name = "timezone";
1,149✔
1177
      break;
1,149✔
1178
    case TSDB_OPTION_USE_ADAPTER:
×
1179
      name = "useAdapter";
×
1180
      break;
×
1181
    default:
×
1182
      break;
×
1183
  }
1184

1185
  return name;
1,737✔
1186
}
1187

1188
int taos_options_imp(TSDB_OPTION option, const char *str) {
588,505✔
1189
  if (option == TSDB_OPTION_CONFIGDIR) {
588,505✔
1190
#ifndef WINDOWS
1191
    char newstr[PATH_MAX];
580,523✔
1192
    int  len = strlen(str);
586,768✔
1193
    if (len > 1 && str[0] != '"' && str[0] != '\'') {
586,768✔
1194
      if (len + 2 >= PATH_MAX) {
586,444✔
1195
        tscError("Too long path %s", str);
×
1196
        return -1;
×
1197
      }
1198
      newstr[0] = '"';
586,444✔
1199
      (void)memcpy(newstr + 1, str, len);
586,444✔
1200
      newstr[len + 1] = '"';
586,444✔
1201
      newstr[len + 2] = '\0';
586,444✔
1202
      str = newstr;
586,444✔
1203
    }
1204
#endif
1205
    tstrncpy(configDir, str, PATH_MAX);
586,768✔
1206
    tscInfo("set cfg:%s to %s", configDir, str);
586,768✔
1207
    return 0;
586,768✔
1208
  }
1209

1210
  // initialize global config
1211
  if (taos_init() != 0) {
1,737✔
1212
    return -1;
×
1213
  }
1214

1215
  SConfig     *pCfg = taosGetCfg();
1,737✔
1216
  SConfigItem *pItem = NULL;
1,737✔
1217
  const char  *name = getCfgName(option);
1,737✔
1218

1219
  if (name == NULL) {
1,737✔
1220
    tscError("Invalid option %d", option);
×
1221
    return -1;
×
1222
  }
1223

1224
  pItem = cfgGetItem(pCfg, name);
1,737✔
1225
  if (pItem == NULL) {
1,737✔
1226
    tscError("Invalid option %d", option);
×
1227
    return -1;
×
1228
  }
1229

1230
  int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
1,737✔
1231
  if (code != 0) {
1,737✔
1232
    tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
588✔
1233
  } else {
1234
    tscInfo("set cfg:%s to %s", name, str);
1,149✔
1235
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
1,149✔
1236
      code = taosCfgDynamicOptions(pCfg, name, false);
×
1237
    }
1238
  }
1239

1240
  return code;
1,737✔
1241
}
1242

1243
/**
1244
 * The request id is an unsigned integer format of 64bit.
1245
 *+------------+-----+-----------+---------------+
1246
 *| uid|localIp| PId | timestamp | serial number |
1247
 *+------------+-----+-----------+---------------+
1248
 *| 12bit      |12bit|24bit      |16bit          |
1249
 *+------------+-----+-----------+---------------+
1250
 * @return
1251
 */
1252
uint64_t generateRequestId() {
792,453,196✔
1253
  static uint32_t hashId = 0;
1254
  static int32_t  requestSerialId = 0;
1255

1256
  if (hashId == 0) {
792,453,196✔
1257
    int32_t code = taosGetSystemUUIDU32(&hashId);
1,740,786✔
1258
    if (code != TSDB_CODE_SUCCESS) {
1,740,786✔
1259
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
×
1260
               tstrerror(code));
1261
    }
1262
  }
1263

1264
  uint64_t id = 0;
792,458,856✔
1265

1266
  while (true) {
×
1267
    int64_t  ts = taosGetTimestampMs();
792,463,741✔
1268
    uint64_t pid = taosGetPId();
792,463,741✔
1269
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
792,460,046✔
1270
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
792,467,973✔
1271

1272
    id = (((uint64_t)(hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
792,468,496✔
1273
    if (id) {
792,468,496✔
1274
      break;
792,468,496✔
1275
    }
1276
  }
1277
  return id;
792,468,496✔
1278
}
1279

1280
#if 0
1281
#include "cJSON.h"
1282
static setConfRet taos_set_config_imp(const char *config){
1283
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
1284
  static bool setConfFlag = false;
1285
  if (setConfFlag) {
1286
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
1287
    tstrncpy(ret.retMsg, "configuration can only set once", RET_MSG_LENGTH);
1288
    return ret;
1289
  }
1290
  taosInitGlobalCfg();
1291
  cJSON *root = cJSON_Parse(config);
1292
  if (root == NULL){
1293
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
1294
    tstrncpy(ret.retMsg, "parse json error", RET_MSG_LENGTH);
1295
    return ret;
1296
  }
1297

1298
  int size = cJSON_GetArraySize(root);
1299
  if(!cJSON_IsObject(root) || size == 0) {
1300
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
1301
    tstrncpy(ret.retMsg, "json content is invalid, must be not empty object", RET_MSG_LENGTH);
1302
    return ret;
1303
  }
1304

1305
  if(size >= 1000) {
1306
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
1307
    tstrncpy(ret.retMsg, "json object size is too long", RET_MSG_LENGTH);
1308
    return ret;
1309
  }
1310

1311
  for(int i = 0; i < size; i++){
1312
    cJSON *item = cJSON_GetArrayItem(root, i);
1313
    if(!item) {
1314
      ret.retCode = SET_CONF_RET_ERR_INNER;
1315
      tstrncpy(ret.retMsg, "inner error", RET_MSG_LENGTH);
1316
      return ret;
1317
    }
1318
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
1319
      ret.retCode = SET_CONF_RET_ERR_PART;
1320
      if (strlen(ret.retMsg) == 0){
1321
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
1322
      }else{
1323
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1324
        size_t leftSize = tmp >= 0 ? tmp : 0;
1325
        strncat(ret.retMsg, "|",  leftSize);
1326
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1327
        leftSize = tmp >= 0 ? tmp : 0;
1328
        strncat(ret.retMsg, item->string, leftSize);
1329
      }
1330
    }
1331
  }
1332
  cJSON_Delete(root);
1333
  setConfFlag = true;
1334
  return ret;
1335
}
1336

1337
setConfRet taos_set_config(const char *config){
1338
  taosThreadMutexLock(&setConfMutex);
1339
  setConfRet ret = taos_set_config_imp(config);
1340
  taosThreadMutexUnlock(&setConfMutex);
1341
  return ret;
1342
}
1343
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc