• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5052

13 May 2026 12:00PM UTC coverage: 73.338% (-0.02%) from 73.358%
#5052

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

761 existing lines in 163 files now uncovered.

281469 of 383795 relevant lines covered (73.34%)

134502812.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.21
/source/client/src/clientEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ttimer.h>
17
#include "cJSON.h"
18
#include "catalog.h"
19
#include "clientInt.h"
20
#include "clientLog.h"
21
#include "clientMonitor.h"
22
#include "functionMgt.h"
23
#include "os.h"
24
#include "osSleep.h"
25
#include "query.h"
26
#include "qworker.h"
27
#include "scheduler.h"
28
#include "tcache.h"
29
#include "tcompare.h"
30
#include "tconv.h"
31
#include "tglobal.h"
32
#include "thttp.h"
33
#include "tmsg.h"
34
#include "tqueue.h"
35
#include "tref.h"
36
#include "trpc.h"
37
#include "tsched.h"
38
#include "ttime.h"
39
#include "tversion.h"
40

41
#include "clientSession.h"
42
#include "cus_name.h"
43

44
#define ENV_JSON_FALSE_CHECK(c)                     \
45
  do {                                              \
46
    if (!c) {                                       \
47
      tscError("faild to add item to JSON object"); \
48
      code = TSDB_CODE_TSC_FAIL_GENERATE_JSON;      \
49
      goto _end;                                    \
50
    }                                               \
51
  } while (0)
52

53
#define ENV_ERR_RET(c, info)          \
54
  do {                                \
55
    int32_t _code = (c);              \
56
    if (_code != TSDB_CODE_SUCCESS) { \
57
      terrno = _code;                 \
58
      tscInitRes = _code;             \
59
      tscError(info);                 \
60
      return;                         \
61
    }                                 \
62
  } while (0)
63

64
STscDbg   tscDbg = {0};
65
SAppInfo  appInfo;
66
int64_t   lastClusterId = 0;
67
int32_t   clientReqRefPool = -1;
68
int32_t   clientConnRefPool = -1;
69
int32_t   clientStop = -1;
70
SHashObj *pTimezoneMap = NULL;
71

72
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
73
volatile int32_t    tscInitRes = 0;
74

75
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
1,268,917,440✔
76
  int32_t code = TSDB_CODE_SUCCESS;
1,268,917,440✔
77
  // connection has been released already, abort creating request.
78
  if (!mayCreateAsyncWork()) {
1,268,917,440✔
79
    return TSDB_CODE_APP_IS_STOPPING;
1,702✔
80
  }
81
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
1,268,928,430✔
82
  if (pRequest->self < 0) {
1,268,968,915✔
83
    tscError("failed to add ref to request");
×
84
    code = terrno;
×
85
    return code;
×
86
  }
87

88
  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
1,268,945,911✔
89

90
  if (pTscObj->pAppInfo) {
1,269,005,882✔
91
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
1,268,998,106✔
92

93
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
1,268,998,088✔
94
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
1,269,003,334✔
95
    tscDebug("req:0x%" PRIx64 ", create request from conn:0x%" PRIx64
1,269,013,006✔
96
             ", current:%d, app current:%d, total:%d, QID:0x%" PRIx64,
97
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
98
  }
99

100
  return code;
1,268,944,860✔
101
}
102

103
static void concatStrings(SArray *list, char *buf, int size) {
×
104
  int len = 0;
×
105
  for (int i = 0; i < taosArrayGetSize(list); i++) {
×
106
    char *db = taosArrayGet(list, i);
×
107
    if (NULL == db) {
×
108
      tscError("get dbname failed, buf:%s", buf);
×
109
      break;
×
110
    }
111
    char *dot = strchr(db, '.');
×
112
    if (dot != NULL) {
×
113
      db = dot + 1;
×
114
    }
115
    if (i != 0) {
×
116
      (void)strncat(buf, ",", size - 1 - len);
×
117
      len += 1;
×
118
    }
119
    int ret = snprintf(buf + len, size - len, "%s", db);
×
120
    if (ret < 0) {
×
121
      tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
×
122
      break;
×
123
    }
124
    len += ret;
×
125
    if (len >= size) {
×
126
      tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
×
127
      break;
×
128
    }
129
  }
130
}
×
131
#ifdef USE_REPORT
132
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration) {
×
133
  cJSON  *json = cJSON_CreateObject();
×
134
  int32_t code = TSDB_CODE_SUCCESS;
×
135
  if (json == NULL) {
×
136
    tscError("failed to create monitor json");
×
137
    return TSDB_CODE_OUT_OF_MEMORY;
×
138
  }
139
  char clusterId[32] = {0};
×
140
  if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0) {
×
141
    tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
×
142
    code = TSDB_CODE_FAILED;
×
143
    goto _end;
×
144
  }
145

146
  char startTs[32] = {0};
×
147
  if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start / 1000) < 0) {
×
148
    tscError("failed to generate startTs:%" PRId64, pRequest->metric.start / 1000);
×
149
    code = TSDB_CODE_FAILED;
×
150
    goto _end;
×
151
  }
152

153
  char requestId[32] = {0};
×
154
  if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0) {
×
155
    tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
×
156
    code = TSDB_CODE_FAILED;
×
157
    goto _end;
×
158
  }
159
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
×
160
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
×
161
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
×
162
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration / 1000)));
×
163
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
×
164
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
×
165
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
×
166
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
×
167
      json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
168
  if (pRequest->sqlstr != NULL &&
×
169
      strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
×
170
    char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
×
171
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
×
172
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
173
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
×
174
  } else {
175
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
176
  }
177

178
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
×
179
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
×
180
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
×
181

182
  char pid[32] = {0};
×
183
  if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0) {
×
184
    tscError("failed to generate pid:%d", appInfo.pid);
×
185
    code = TSDB_CODE_FAILED;
×
186
    goto _end;
×
187
  }
188

189
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
×
190
  if (pRequest->dbList != NULL) {
×
191
    char dbList[1024] = {0};
×
192
    concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
×
193
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
×
194
  } else if (pRequest->pDb != NULL) {
×
195
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
×
196
  } else {
197
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
×
198
  }
199

200
  char *value = cJSON_PrintUnformatted(json);
×
201
  if (value == NULL || strlen(value) == 0) {
×
202
    tscError("failed to print json, data:%s", value == NULL ? "null" : value);
×
203
    code = TSDB_CODE_FAILED;
×
204
    goto _end;
×
205
  }
206
  MonitorSlowLogData data = {0};
×
207
  data.clusterId = pTscObj->pAppInfo->clusterId;
×
208
  data.type = SLOW_LOG_WRITE;
×
209
  data.data = value;
×
210
  code = monitorPutData2MonitorQueue(data);
×
211
  if (TSDB_CODE_SUCCESS != code) {
×
212
    taosMemoryFree(value);
×
213
    goto _end;
×
214
  }
215

216
_end:
×
217
  cJSON_Delete(json);
×
218
  return code;
×
219
}
220
#endif
221
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char *exceptDb) {
416,608✔
222
  if (pRequest->pDb != NULL) {
416,608✔
223
    return strcmp(pRequest->pDb, exceptDb) != 0;
234,830✔
224
  }
225

226
  for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
327,119✔
227
    char *db = taosArrayGet(pRequest->dbList, i);
145,341✔
228
    if (NULL == db) {
145,341✔
229
      tscError("get dbname failed, exceptDb:%s", exceptDb);
×
230
      return false;
×
231
    }
232
    char *dot = strchr(db, '.');
145,341✔
233
    if (dot != NULL) {
145,341✔
234
      db = dot + 1;
145,341✔
235
    }
236
    if (strcmp(db, exceptDb) == 0) {
145,341✔
237
      return false;
×
238
    }
239
  }
240
  return true;
181,778✔
241
}
242

243
static void deregisterRequest(SRequestObj *pRequest) {
1,268,721,149✔
244
  if (pRequest == NULL) {
1,268,721,149✔
245
    tscError("pRequest == NULL");
×
246
    return;
×
247
  }
248

249
  STscObj            *pTscObj = pRequest->pTscObj;
1,268,721,149✔
250
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
1,268,729,379✔
251

252
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
1,268,748,327✔
253
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
1,268,815,238✔
254
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
1,268,815,889✔
255

256
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
1,268,789,278✔
257
  tscDebug("req:0x%" PRIx64 ", free from conn:0x%" PRIx64 ", QID:0x%" PRIx64
1,268,790,123✔
258
           ", elapsed:%.2f ms, current:%d, app current:%d",
259
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
260

261
  if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
1,268,790,693✔
262
    if ((pRequest->pQuery && pRequest->pQuery->pRoot && QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
1,268,764,394✔
263
         (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
762,624,857✔
264
        QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
574,732,326✔
265
      tscDebug("req:0x%" PRIx64 ", insert duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
770,271,299✔
266
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
267
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
268
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
269
               pRequest->requestId);
270
      (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
770,271,299✔
271
      reqType = SLOW_LOG_TYPE_INSERT;
770,291,484✔
272
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
498,511,578✔
273
      tscDebug("req:0x%" PRIx64 ", query duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
186,839,234✔
274
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
275
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
276
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
277
               pRequest->requestId);
278

279
      (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
186,839,234✔
280
      reqType = SLOW_LOG_TYPE_QUERY;
186,839,459✔
281
    }
282

283
    if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
1,268,803,591✔
284
      tscError("failed to release allocator");
×
285
    }
286
  }
287

288
#ifdef USE_REPORT
289
  if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
1,268,755,919✔
290
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
539,728✔
291
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
208,093✔
292
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
331,635✔
293
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
76,974✔
294
    } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
254,661✔
295
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
951✔
296
    }
297
  }
298

299
  if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
1,269,174,884✔
300
      checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
416,608✔
301
    (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
416,608✔
302
    if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
416,608✔
303
      taosPrintSlowLog("PID:%d, connId:%u, QID:0x%" PRIx64 ", Start:%" PRId64 "us, Duration:%" PRId64 "us, SQL:%s",
190,642✔
304
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
305
                       pRequest->sqlstr);
306
      if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
190,642✔
307
        slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
×
308
        if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
×
309
          tscError("failed to generate write slow log");
×
310
        }
311
      }
312
    }
313
  }
314
#endif
315

316
  releaseTscObj(pTscObj->id);
1,268,761,215✔
317
}
318

319
// todo close the transporter properly
320
void closeTransporter(SAppInstInfo *pAppInfo) {
1,704,561✔
321
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
1,704,561✔
322
    return;
×
323
  }
324

325
  void *pTransporter = pAppInfo->pTransporter;
1,704,561✔
326
  pAppInfo->pTransporter = NULL;
1,704,561✔
327

328
  tscDebug("free transporter:%p in app inst %p", pTransporter, pAppInfo);
1,704,561✔
329
  rpcClose(pTransporter);
1,704,561✔
330
}
331

332
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
52,348,571✔
333
  if (NEED_REDIRECT_ERROR(code)) {
52,348,571✔
334
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
16,409,218✔
335
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK ||
16,409,254✔
336
        msgType == TDMT_SCH_TASK_NOTIFY) {
UNCOV
337
      return false;
×
338
    }
339
    return true;
16,409,254✔
340
  } else if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY || code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE ||
35,939,353✔
341
             code == TSDB_CODE_SYN_WRITE_STALL || code == TSDB_CODE_SYN_PROPOSE_NOT_READY ||
35,938,986✔
342
             code == TSDB_CODE_SYN_RESTORING) {
343
    tscDebug("client msg type %s should retry since %s", TMSG_INFO(msgType), tstrerror(code));
403✔
344
    return true;
×
345
  } else {
346
    return false;
35,938,950✔
347
  }
348
}
349

350
// start timer for particular msgType
351
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
×
352
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
×
353
    return true;
×
354
  }
355
  return false;
×
356
}
357

358
// TODO refactor
359
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
1,705,597✔
360
  SRpcInit rpcInit;
1,686,747✔
361
  (void)memset(&rpcInit, 0, sizeof(rpcInit));
1,705,597✔
362
  rpcInit.localPort = 0;
1,705,597✔
363
  rpcInit.label = "TSC";
1,705,597✔
364
  rpcInit.numOfThreads = tsNumOfRpcThreads;
1,705,597✔
365
  rpcInit.cfp = processMsgFromServer;
1,705,597✔
366
  rpcInit.rfp = clientRpcRfp;
1,705,597✔
367
  rpcInit.sessions = 1024;
1,705,597✔
368
  rpcInit.connType = TAOS_CONN_CLIENT;
1,705,597✔
369
  rpcInit.user = (char *)(user ? user : auth);
1,705,597✔
370
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,705,597✔
371
  rpcInit.compressSize = tsCompressMsgSize;
1,705,597✔
372
  rpcInit.dfp = destroyAhandle;
1,705,597✔
373

374
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,705,597✔
375
  rpcInit.retryStepFactor = tsRedirectFactor;
1,705,597✔
376
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,705,597✔
377
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,705,597✔
378

379
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
1,705,597✔
380
  connLimitNum = TMAX(connLimitNum, 10);
1,705,597✔
381
  connLimitNum = TMIN(connLimitNum, 1000);
1,705,597✔
382
  rpcInit.connLimitNum = connLimitNum;
1,705,597✔
383
  rpcInit.shareConnLimit = tsShareConnLimit;
1,705,597✔
384
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,705,597✔
385
  rpcInit.startReadTimer = 1;
1,705,597✔
386
  rpcInit.readTimeout = tsReadTimeout;
1,705,597✔
387
  rpcInit.ipv6 = tsEnableIpv6;
1,705,597✔
388
  rpcInit.enableSSL = tsEnableTLS;
1,705,597✔
389
  rpcInit.enableSasl = tsEnableSasl;
1,705,597✔
390
  rpcInit.isToken = user == NULL ? 1 : 0;
1,705,597✔
391

392
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
1,705,597✔
393
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
1,705,597✔
394
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
1,705,597✔
395
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
1,705,597✔
396
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
1,705,597✔
397

398
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
1,705,597✔
399
  if (TSDB_CODE_SUCCESS != code) {
1,705,597✔
400
    tscError("invalid version string.");
×
401
    return code;
×
402
  }
403

404
  tscInfo("rpc max retry timeout %" PRId64 "", rpcInit.retryMaxTimeout);
1,705,597✔
405
  *pDnodeConn = rpcOpen(&rpcInit);
1,705,597✔
406
  if (*pDnodeConn == NULL) {
1,705,597✔
407
    tscError("failed to init connection to server since %s", tstrerror(terrno));
188✔
408
    code = terrno;
188✔
409
  }
410

411
  return code;
1,705,597✔
412
}
413

414
void destroyAllRequests(SHashObj *pRequests) {
87,384,802✔
415
  void *pIter = taosHashIterate(pRequests, NULL);
87,384,802✔
416
  while (pIter != NULL) {
87,388,337✔
417
    int64_t *rid = pIter;
×
418

419
    SRequestObj *pRequest = acquireRequest(*rid);
×
420
    if (pRequest) {
×
421
      destroyRequest(pRequest);
×
422
      (void)releaseRequest(*rid);  // ignore error
×
423
    }
424

425
    pIter = taosHashIterate(pRequests, pIter);
×
426
  }
427
}
87,388,337✔
428

429
void stopAllRequests(SHashObj *pRequests) {
67✔
430
  void *pIter = taosHashIterate(pRequests, NULL);
67✔
431
  while (pIter != NULL) {
67✔
432
    int64_t *rid = pIter;
×
433

434
    SRequestObj *pRequest = acquireRequest(*rid);
×
435
    if (pRequest) {
×
436
      taos_stop_query(pRequest);
×
437
      (void)releaseRequest(*rid);  // ignore error
×
438
    }
439

440
    pIter = taosHashIterate(pRequests, pIter);
×
441
  }
442
}
67✔
443

444
void destroyAppInst(void *info) {
1,704,561✔
445
  SAppInstInfo *pAppInfo = *(SAppInstInfo **)info;
1,704,561✔
446
  tscInfo("destroy app inst mgr %p", pAppInfo);
1,704,561✔
447

448
  int32_t code = taosThreadMutexLock(&appInfo.mutex);
1,704,561✔
449
  if (TSDB_CODE_SUCCESS != code) {
1,704,561✔
450
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
451
  }
452

453
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
1,704,561✔
454

455
  code = taosThreadMutexUnlock(&appInfo.mutex);
1,704,561✔
456
  if (TSDB_CODE_SUCCESS != code) {
1,704,561✔
457
    tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
458
  }
459

460
  taosMemoryFreeClear(pAppInfo->instKey);
1,704,561✔
461
  closeTransporter(pAppInfo);
1,704,561✔
462

463
  code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
1,704,561✔
464
  if (TSDB_CODE_SUCCESS != code) {
1,704,561✔
465
    tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
466
  }
467

468
  taosArrayDestroy(pAppInfo->pQnodeList);
1,704,561✔
469
  code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
1,704,561✔
470
  if (TSDB_CODE_SUCCESS != code) {
1,704,561✔
471
    tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
472
  }
473

474
  taosMemoryFree(pAppInfo);
1,704,561✔
475
}
1,704,561✔
476

477
//  tscObj 1--->conn1
478
/// tscObj 2-->conn1
479
//  tscObj 3-->conn1
480

481
void destroyTscObj(void *pObj) {
87,385,729✔
482
  if (NULL == pObj) {
87,385,729✔
483
    return;
×
484
  }
485

486
  STscObj *pTscObj = pObj;
87,385,729✔
487
  int64_t  tscId = pTscObj->id;
87,385,729✔
488
  tscTrace("conn:%" PRIx64 ", begin destroy, p:%p", tscId, pTscObj);
87,385,729✔
489

490
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
87,385,729✔
491
  hbDeregisterConn(pTscObj, connKey);
87,385,729✔
492

493
  destroyAllRequests(pTscObj->pRequests);
87,388,337✔
494
  taosHashCleanup(pTscObj->pRequests);
87,388,339✔
495

496
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
87,388,343✔
497
  tscDebug("conn:0x%" PRIx64 ", p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
87,387,752✔
498
           pTscObj->pAppInfo->numOfConns);
499

500
  // In any cases, we should not free app inst here. Or an race condition rises.
501
  /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
87,387,752✔
502

503
  (void)taosThreadMutexDestroy(&pTscObj->mutex);
87,387,908✔
504
  taosMemoryFree(pTscObj);
87,386,894✔
505

506
  tscTrace("conn:0x%" PRIx64 ", end destroy, p:%p", tscId, pTscObj);
87,387,599✔
507
}
508

509
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
87,601,001✔
510
                     STscObj **pObj) {
511
  *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
87,601,001✔
512
  if (NULL == *pObj) {
87,597,321✔
513
    return terrno;
×
514
  }
515

516
  (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
87,597,321✔
517
  if (NULL == (*pObj)->pRequests) {
87,598,880✔
518
    taosMemoryFree(*pObj);
×
519
    return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
520
  }
521

522
  (*pObj)->connType = connType;
87,598,880✔
523
  (*pObj)->pAppInfo = pAppInfo;
87,598,880✔
524
  (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
87,598,880✔
525
  if (user == NULL) {
87,598,877✔
526
    (*pObj)->user[0] = 0;
2,715✔
527
    (*pObj)->pass[0] = 0;
2,715✔
528
    tstrncpy((*pObj)->token, auth, sizeof((*pObj)->token));
2,715✔
529
  } else {
530
    tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
87,596,162✔
531
    (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
87,596,165✔
532
  }
533
  (*pObj)->tokenName[0] = 0;
87,598,881✔
534
  (*pObj)->enable = 1;  // enabled by default
87,598,880✔
535

536
  if (db != NULL) {
87,598,880✔
537
    tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
87,601,462✔
538
  }
539

540
  TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
87,598,879✔
541

542
  int32_t code = TSDB_CODE_SUCCESS;
87,601,239✔
543

544
  (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
87,601,239✔
545
  if ((*pObj)->id < 0) {
87,600,834✔
546
    tscError("failed to add object to clientConnRefPool");
×
547
    code = terrno;
×
548
    taosMemoryFree(*pObj);
×
549
    return code;
×
550
  }
551

552
  (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
87,600,866✔
553

554
  updateConnAccessInfo(&(*pObj)->sessInfo);
87,601,084✔
555
  tscInfo("conn:0x%" PRIx64 ", created, p:%p", (*pObj)->id, *pObj);
87,601,309✔
556
  return code;
87,601,548✔
557
}
558

559
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
2,147,483,647✔
560

561
void releaseTscObj(int64_t rid) {
2,147,483,647✔
562
  int32_t code = taosReleaseRef(clientConnRefPool, rid);
2,147,483,647✔
563
  if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
564
    tscWarn("failed to release TscObj, code:%s", tstrerror(code));
×
565
  }
566
}
2,147,483,647✔
567

568
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
1,268,934,192✔
569
  int32_t code = TSDB_CODE_SUCCESS;
1,268,934,192✔
570
  *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
1,268,934,192✔
571
  if (NULL == *pRequest) {
1,268,909,064✔
572
    return terrno;
×
573
  }
574

575
  STscObj *pTscObj = acquireTscObj(connId);
1,268,917,095✔
576
  if (pTscObj == NULL) {
1,269,020,240✔
577
    TSC_ERR_JRET(TSDB_CODE_TSC_DISCONNECTED);
×
578
  }
579
  if (pTscObj->enable == 0) {
1,269,020,240✔
580
    releaseTscObj(connId);
1,253✔
581
    TSC_ERR_JRET(TSDB_CODE_MND_USER_DISABLED);
1,253✔
582
  }
583
  SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
1,268,999,510✔
584
  if (interParam == NULL) {
1,269,001,204✔
585
    releaseTscObj(connId);
×
586
    TSC_ERR_JRET(terrno);
×
587
  }
588
  TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
1,269,001,204✔
589
  interParam->pRequest = *pRequest;
1,268,967,679✔
590
  (*pRequest)->body.interParam = interParam;
1,268,987,153✔
591

592
  (*pRequest)->resType = RES_TYPE__QUERY;
1,268,979,840✔
593
  (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
1,268,991,071✔
594
  (*pRequest)->metric.start = taosGetTimestampUs();
2,147,483,647✔
595
  (*pRequest)->execPhase = QUERY_PHASE_NONE;
1,268,997,026✔
596
  (*pRequest)->phaseStartTime = 0;
1,268,988,797✔
597

598
  (*pRequest)->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
1,268,972,058✔
599
  (*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt;
1,268,976,764✔
600
  (*pRequest)->type = type;
1,268,985,595✔
601
  (*pRequest)->allocatorRefId = -1;
1,268,991,155✔
602

603
  (*pRequest)->pDb = getDbOfConnection(pTscObj);
1,269,002,930✔
604
  if (NULL == (*pRequest)->pDb) {
1,268,964,100✔
605
    TSC_ERR_JRET(terrno);
288,733,200✔
606
  }
607
  (*pRequest)->pTscObj = pTscObj;
1,268,963,821✔
608
  (*pRequest)->inCallback = false;
1,268,991,501✔
609
  (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
1,268,962,593✔
610
  if (NULL == (*pRequest)->msgBuf) {
1,268,892,093✔
611
    code = terrno;
×
612
    goto _return;
×
613
  }
614
  (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
1,268,930,701✔
615
  TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
1,268,950,960✔
616
  TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
1,268,978,508✔
617

618
  return TSDB_CODE_SUCCESS;
1,268,945,770✔
619
_return:
2,955✔
620
  if ((*pRequest)->pTscObj) {
2,955✔
621
    doDestroyRequest(*pRequest);
1,702✔
622
  } else {
623
    taosMemoryFree(*pRequest);
1,253✔
624
  }
625
  return code;
2,955✔
626
}
627

628
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
1,402,438,937✔
629
  taosMemoryFreeClear(pResInfo->pRspMsg);
1,402,438,937✔
630
  taosMemoryFreeClear(pResInfo->length);
1,402,460,850✔
631
  taosMemoryFreeClear(pResInfo->row);
1,402,446,933✔
632
  taosMemoryFreeClear(pResInfo->pCol);
1,402,454,657✔
633
  taosMemoryFreeClear(pResInfo->fields);
1,402,429,860✔
634
  taosMemoryFreeClear(pResInfo->userFields);
1,402,446,166✔
635
  taosMemoryFreeClear(pResInfo->convertJson);
1,402,438,710✔
636
  taosMemoryFreeClear(pResInfo->decompBuf);
1,402,445,349✔
637

638
  if (pResInfo->convertBuf != NULL) {
1,402,426,044✔
639
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
1,228,438,013✔
640
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
1,008,719,512✔
641
    }
642
    taosMemoryFreeClear(pResInfo->convertBuf);
219,720,301✔
643
  }
644
}
1,402,449,861✔
645

646
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
2,147,483,647✔
647

648
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
2,147,483,647✔
649

650
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
1,268,763,941✔
651

652
/// return the most previous req ref id
653
int64_t removeFromMostPrevReq(SRequestObj *pRequest) {
1,268,766,992✔
654
  int64_t      mostPrevReqRefId = pRequest->self;
1,268,766,992✔
655
  SRequestObj *pTmp = pRequest;
1,268,785,267✔
656
  while (pTmp->relation.prevRefId) {
1,268,785,267✔
657
    pTmp = acquireRequest(pTmp->relation.prevRefId);
×
658
    if (pTmp) {
×
659
      mostPrevReqRefId = pTmp->self;
×
660
      (void)releaseRequest(mostPrevReqRefId);  // ignore error
×
661
    } else {
662
      break;
×
663
    }
664
  }
665
  (void)removeRequest(mostPrevReqRefId);  // ignore error
1,268,784,382✔
666
  return mostPrevReqRefId;
1,268,807,125✔
667
}
668

669
void destroyNextReq(int64_t nextRefId) {
1,268,763,923✔
670
  if (nextRefId) {
1,268,763,923✔
671
    SRequestObj *pObj = acquireRequest(nextRefId);
×
672
    if (pObj) {
×
673
      (void)releaseRequest(nextRefId);  // ignore error
×
674
      (void)releaseRequest(nextRefId);  // ignore error
×
675
    }
676
  }
677
}
1,268,763,923✔
678

679
void destroySubRequests(SRequestObj *pRequest) {
×
680
  int32_t      reqIdx = -1;
×
681
  SRequestObj *pReqList[16] = {NULL};
×
682
  uint64_t     tmpRefId = 0;
×
683

684
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
×
685
    return;
×
686
  }
687

688
  SRequestObj *pTmp = pRequest;
×
689
  while (pTmp->relation.prevRefId) {
×
690
    tmpRefId = pTmp->relation.prevRefId;
×
691
    pTmp = acquireRequest(tmpRefId);
×
692
    if (pTmp) {
×
693
      pReqList[++reqIdx] = pTmp;
×
694
      (void)releaseRequest(tmpRefId);  // ignore error
×
695
    } else {
696
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
697
      break;
×
698
    }
699
  }
700

701
  for (int32_t i = reqIdx; i >= 0; i--) {
×
702
    (void)removeRequest(pReqList[i]->self);  // ignore error
×
703
  }
704

705
  tmpRefId = pRequest->relation.nextRefId;
×
706
  while (tmpRefId) {
×
707
    pTmp = acquireRequest(tmpRefId);
×
708
    if (pTmp) {
×
709
      tmpRefId = pTmp->relation.nextRefId;
×
710
      (void)removeRequest(pTmp->self);   // ignore error
×
711
      (void)releaseRequest(pTmp->self);  // ignore error
×
712
    } else {
713
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
714
      break;
×
715
    }
716
  }
717
}
718

719
void doDestroyRequest(void *p) {
1,268,780,064✔
720
  if (NULL == p) {
1,268,780,064✔
721
    return;
×
722
  }
723

724
  SRequestObj *pRequest = (SRequestObj *)p;
1,268,780,064✔
725

726
  uint64_t reqId = pRequest->requestId;
1,268,780,064✔
727
  tscTrace("QID:0x%" PRIx64 ", begin destroy request, res:%p", reqId, pRequest);
1,268,801,374✔
728

729
  int64_t nextReqRefId = pRequest->relation.nextRefId;
1,268,801,374✔
730

731
  int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
1,268,792,345✔
732
  if (TSDB_CODE_SUCCESS != code) {
1,268,807,335✔
733
    tscDebug("failed to remove request from hash since %s", tstrerror(code));
87,602,402✔
734
  }
735
  schedulerFreeJob(&pRequest->body.queryJob, 0);
1,268,807,335✔
736

737
  destorySqlCallbackWrapper(pRequest->pWrapper);
1,268,799,179✔
738

739
  taosMemoryFreeClear(pRequest->msgBuf);
1,268,764,080✔
740

741
  doFreeReqResultInfo(&pRequest->body.resInfo);
1,268,758,790✔
742
  if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
1,268,738,116✔
743
    tscError("failed to destroy semaphore");
×
744
  }
745

746
  SSessParam para = {.type = SESSION_MAX_CONCURRENCY, .value = -1, .noCheck = 1};
1,268,779,514✔
747
  code = tscUpdateSessMetric(pRequest->pTscObj, &para);
1,268,744,915✔
748
  if (TSDB_CODE_SUCCESS != code) {
1,268,801,291✔
749
    tscError("failed to update session metric, code:%s", tstrerror(code));
×
750
  }
751

752
  taosArrayDestroy(pRequest->tableList);
1,268,801,291✔
753
  taosArrayDestroy(pRequest->targetTableList);
1,268,757,259✔
754
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
1,268,784,983✔
755

756
  if (pRequest->self) {
1,268,744,323✔
757
    deregisterRequest(pRequest);
1,268,789,058✔
758
  }
759

760
  taosMemoryFreeClear(pRequest->pDb);
1,268,795,127✔
761
  taosArrayDestroy(pRequest->dbList);
1,268,738,235✔
762
  if (pRequest->body.interParam) {
1,268,770,454✔
763
    if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
1,268,784,222✔
764
      tscError("failed to destroy semaphore in pRequest");
×
765
    }
766
  }
767
  taosMemoryFree(pRequest->body.interParam);
1,268,789,929✔
768

769
  qDestroyQuery(pRequest->pQuery);
1,268,778,627✔
770

771
  // `pRequest->parseMeta` may be filled during stmt parsing and must be released
772
  // when the request object is destroyed, otherwise LeakSanitizer will report
773
  // catalog async response result leaks.
774
  catalogFreeMetaData(&pRequest->parseMeta);
1,268,754,665✔
775
  TAOS_MEMSET(&pRequest->parseMeta, 0, sizeof(pRequest->parseMeta));
1,268,733,892✔
776
  nodesDestroyAllocator(pRequest->allocatorRefId);
1,268,752,992✔
777

778
  taosMemoryFreeClear(pRequest->effectiveUser);
1,268,744,391✔
779
  taosMemoryFreeClear(pRequest->sqlstr);
1,268,749,252✔
780
  taosMemoryFree(pRequest);
1,268,741,500✔
781
  tscTrace("QID:0x%" PRIx64 ", end destroy request, res:%p", reqId, pRequest);
1,268,764,021✔
782
  destroyNextReq(nextReqRefId);
1,268,764,021✔
783
}
784

785
void destroyRequest(SRequestObj *pRequest) {
1,268,767,072✔
786
  if (pRequest == NULL) return;
1,268,767,072✔
787

788
  taos_stop_query(pRequest);
1,268,767,072✔
789
  (void)removeFromMostPrevReq(pRequest);
1,268,793,758✔
790
}
791

792
void taosStopQueryImpl(SRequestObj *pRequest) {
1,268,770,484✔
793
  pRequest->killed = true;
1,268,770,484✔
794

795
  // It is not a query, no need to stop.
796
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
1,268,798,484✔
797
    tscDebug("QID:0x%" PRIx64 ", no need to be killed since not query", pRequest->requestId);
297,924,013✔
798
    return;
297,954,483✔
799
  }
800

801
  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
970,842,021✔
802
  tscDebug("QID:0x%" PRIx64 ", killed", pRequest->requestId);
970,847,436✔
803
}
804

805
void stopAllQueries(SRequestObj *pRequest) {
1,268,762,245✔
806
  int32_t      reqIdx = -1;
1,268,762,245✔
807
  SRequestObj *pReqList[16] = {NULL};
1,268,762,245✔
808
  uint64_t     tmpRefId = 0;
1,268,765,227✔
809

810
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
1,268,765,227✔
811
    return;
×
812
  }
813

814
  SRequestObj *pTmp = pRequest;
1,268,789,761✔
815
  while (pTmp->relation.prevRefId) {
1,268,789,761✔
816
    tmpRefId = pTmp->relation.prevRefId;
×
817
    pTmp = acquireRequest(tmpRefId);
×
818
    if (pTmp) {
×
819
      pReqList[++reqIdx] = pTmp;
×
820
    } else {
821
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
822
      break;
×
823
    }
824
  }
825

826
  for (int32_t i = reqIdx; i >= 0; i--) {
1,268,785,624✔
827
    taosStopQueryImpl(pReqList[i]);
×
828
    (void)releaseRequest(pReqList[i]->self);  // ignore error
×
829
  }
830

831
  taosStopQueryImpl(pRequest);
1,268,785,624✔
832

833
  tmpRefId = pRequest->relation.nextRefId;
1,268,799,585✔
834
  while (tmpRefId) {
1,268,776,061✔
835
    pTmp = acquireRequest(tmpRefId);
×
836
    if (pTmp) {
×
837
      tmpRefId = pTmp->relation.nextRefId;
×
838
      taosStopQueryImpl(pTmp);
×
839
      (void)releaseRequest(pTmp->self);  // ignore error
×
840
    } else {
841
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
842
      break;
×
843
    }
844
  }
845
}
846
#ifdef USE_REPORT
847
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
×
848

849
static void *tscCrashReportThreadFp(void *param) {
×
850
  int32_t code = 0;
×
851
  setThreadName("client-crashReport");
×
852
  char filepath[PATH_MAX] = {0};
×
853
  (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
×
854
  char     *pMsg = NULL;
×
855
  int64_t   msgLen = 0;
×
856
  TdFilePtr pFile = NULL;
×
857
  bool      truncateFile = false;
×
858
  int32_t   sleepTime = 200;
×
859
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
860
  int32_t   loopTimes = reportPeriodNum;
×
861

862
#ifdef WINDOWS
863
  if (taosCheckCurrentInDll()) {
864
    atexit(crashReportThreadFuncUnexpectedStopped);
865
  }
866
#endif
867

868
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
×
869
    return NULL;
×
870
  }
871
  STelemAddrMgmt mgt;
×
872
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
873
  if (code) {
×
874
    tscError("failed to init telemetry management, code:%s", tstrerror(code));
×
875
    return NULL;
×
876
  }
877

878
  code = initCrashLogWriter();
×
879
  if (code) {
×
880
    tscError("failed to init crash log writer, code:%s", tstrerror(code));
×
881
    return NULL;
×
882
  }
883

884
  while (1) {
885
    checkAndPrepareCrashInfo();
×
886
    if (clientStop > 0 && reportThreadSetQuit()) break;
×
887
    if (loopTimes++ < reportPeriodNum) {
×
888
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
889
      taosMsleep(sleepTime);
×
890
      continue;
×
891
    }
892

893
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
894
    if (pMsg && msgLen > 0) {
×
895
      if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
896
        tscError("failed to send crash report");
×
897
        if (pFile) {
×
898
          taosReleaseCrashLogFile(pFile, false);
×
899
          pFile = NULL;
×
900

901
          taosMsleep(sleepTime);
×
902
          loopTimes = 0;
×
903
          continue;
×
904
        }
905
      } else {
906
        tscInfo("succeed to send crash report");
×
907
        truncateFile = true;
×
908
      }
909
    } else {
910
      tscInfo("no crash info was found");
×
911
    }
912

913
    taosMemoryFree(pMsg);
×
914

915
    if (pMsg && msgLen > 0) {
×
916
      pMsg = NULL;
×
917
      continue;
×
918
    }
919

920
    if (pFile) {
×
921
      taosReleaseCrashLogFile(pFile, truncateFile);
×
922
      pFile = NULL;
×
923
      truncateFile = false;
×
924
    }
925

926
    taosMsleep(sleepTime);
×
927
    loopTimes = 0;
×
928
  }
929
  taosTelemetryDestroy(&mgt);
×
930

931
  clientStop = -2;
×
932
  return NULL;
×
933
}
934

935
int32_t tscCrashReportInit() {
1,607,039✔
936
  if (!tsEnableCrashReport) {
1,607,039✔
937
    return TSDB_CODE_SUCCESS;
1,607,039✔
938
  }
939
  int32_t      code = TSDB_CODE_SUCCESS;
×
940
  TdThreadAttr thAttr;
×
941
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
×
942
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
×
943
  TdThread crashReportThread;
×
944
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
×
945
    tscError("failed to create crashReport thread since %s", strerror(ERRNO));
×
946
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
947
    TSC_ERR_RET(terrno);
×
948
  }
949

950
  (void)taosThreadAttrDestroy(&thAttr);
×
951
_return:
×
952
  if (code) {
×
953
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
954
    TSC_ERR_RET(terrno);
×
955
  }
956

957
  return code;
×
958
}
959

960
void tscStopCrashReport() {
1,606,252✔
961
  if (!tsEnableCrashReport) {
1,606,252✔
962
    return;
1,606,252✔
963
  }
964

965
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
×
966
    tscDebug("crash report thread already stopped");
×
967
    return;
×
968
  }
969

970
  while (atomic_load_32(&clientStop) > 0) {
×
971
    taosMsleep(100);
×
972
  }
973
}
974

975
void taos_write_crashinfo(int signum, void *sigInfo, void *context) {
×
976
  writeCrashLogToFile(signum, sigInfo, CUS_PROMPT, lastClusterId, appInfo.startTime);
×
977
}
×
978
#endif
979

980
#ifdef TAOSD_INTEGRATED
981
typedef struct {
982
  TdThread pid;
983
  int32_t  stat;  // < 0: start failed, 0: init(not start), 1: start successfully
984
} SDaemonObj;
985

986
extern int  dmStartDaemon(int argc, char const *argv[]);
987
extern void dmStopDaemon();
988

989
SDaemonObj daemonObj = {0};
990

991
typedef struct {
992
  int32_t argc;
993
  char  **argv;
994
} SExecArgs;
995

996
static void *dmStartDaemonFunc(void *param) {
997
  int32_t    code = 0;
998
  SExecArgs *pArgs = (SExecArgs *)param;
999
  int32_t    argc = pArgs->argc;
1000
  char     **argv = pArgs->argv;
1001

1002
  code = dmStartDaemon(argc, (const char **)argv);
1003
  if (code != 0) {
1004
    printf("failed to start taosd since %s\r\n", tstrerror(code));
1005
    goto _exit;
1006
  }
1007

1008
_exit:
1009
  if (code != 0) {
1010
    atomic_store_32(&daemonObj.stat, code);
1011
  }
1012
  return NULL;
1013
}
1014

1015
static int32_t shellStartDaemon(int argc, char *argv[]) {
1016
  int32_t    code = 0, lino = 0;
1017
  SExecArgs *pArgs = NULL;
1018
  int64_t    startMs = taosGetTimestampMs(), endMs = startMs;
1019

1020
  TdThreadAttr thAttr;
1021
  (void)taosThreadAttrInit(&thAttr);
1022
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1023
#ifdef TD_COMPACT_OS
1024
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
1025
#endif
1026
  pArgs = (SExecArgs *)taosMemoryCalloc(1, sizeof(SExecArgs));
1027
  if (pArgs == NULL) {
1028
    code = terrno;
1029
    TAOS_CHECK_EXIT(code);
1030
  }
1031
  pArgs->argc = argc;
1032
  pArgs->argv = argv;
1033

1034
#ifndef TD_AS_LIB
1035
  tsLogEmbedded = 1;
1036
#endif
1037

1038
  TAOS_CHECK_EXIT(taosThreadCreate(&daemonObj.pid, &thAttr, dmStartDaemonFunc, pArgs));
1039

1040
  while (true) {
1041
    if (atomic_load_64(&tsDndStart)) {
1042
      atomic_store_32(&daemonObj.stat, 1);
1043
      break;
1044
    }
1045
    int32_t daemonstat = atomic_load_32(&daemonObj.stat);
1046
    if (daemonstat < 0) {
1047
      code = daemonstat;
1048
      TAOS_CHECK_EXIT(code);
1049
    }
1050

1051
    if (daemonstat > 1) {
1052
      code = TSDB_CODE_APP_ERROR;
1053
      TAOS_CHECK_EXIT(code);
1054
    }
1055
    taosMsleep(1000);
1056
  }
1057

1058
_exit:
1059
  endMs = taosGetTimestampMs();
1060
  (void)taosThreadAttrDestroy(&thAttr);
1061
  taosMemoryFreeClear(pArgs);
1062
  if (code) {
1063
    printf("\r\n The daemon start failed at line %d since %s, cost %" PRIi64 " ms\r\n", lino, tstrerror(code),
1064
           endMs - startMs);
1065
  } else {
1066
    printf("\r\n The daemon started successfully, cost %" PRIi64 " ms\r\n", endMs - startMs);
1067
  }
1068
#ifndef TD_AS_LIB
1069
  tsLogEmbedded = 0;
1070
#endif
1071
  return code;
1072
}
1073

1074
void shellStopDaemon() {
1075
#ifndef TD_AS_LIB
1076
  tsLogEmbedded = 1;
1077
#endif
1078
  dmStopDaemon();
1079
  if (taosCheckPthreadValid(daemonObj.pid)) {
1080
    (void)taosThreadJoin(daemonObj.pid, NULL);
1081
    taosThreadClear(&daemonObj.pid);
1082
  }
1083
}
1084
#endif
1085

1086
void taos_init_imp(void) {
1,607,100✔
1087
#if defined(LINUX)
1088
  if (tscDbg.memEnable) {
1,607,100✔
1089
    int32_t code = taosMemoryDbgInit();
×
1090
    if (code) {
×
1091
      (void)printf("failed to init memory dbg, error:%s\n", tstrerror(code));
×
1092
    } else {
1093
      tsAsyncLog = false;
×
1094
      (void)printf("memory dbg enabled\n");
×
1095
    }
1096
  }
1097
#endif
1098

1099
  // In the APIs of other program language, taos_cleanup is not available yet.
1100
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
1101
  (void)atexit(taos_cleanup);
1,607,100✔
1102
  SET_ERRNO(TSDB_CODE_SUCCESS);
1,607,100✔
1103
  terrno = TSDB_CODE_SUCCESS;
1,607,100✔
1104
  taosSeedRand(taosGetTimestampSec());
1,607,100✔
1105

1106
  appInfo.pid = taosGetPId();
1,607,100✔
1107
  appInfo.startTime = taosGetTimestampMs();
1,607,100✔
1108
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,607,100✔
1109
  appInfo.pInstMapByClusterId =
1,607,100✔
1110
      taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1,607,100✔
1111
  if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
1,607,100✔
1112
    (void)printf("failed to allocate memory when init appInfo\n");
×
1113
    tscInitRes = terrno;
×
1114
    return;
×
1115
  }
1116
  taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
1,607,100✔
1117

1118
  const char *logName = CUS_PROMPT "log";
1,607,100✔
1119
  ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
1,607,100✔
1120
  if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
1,607,100✔
1121
    (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(ERRNO), configDir);
61✔
1122
    SET_ERROR_MSG("Create %s failed:%s. configDir=%s", logName, strerror(ERRNO), configDir);
61✔
1123
    tscInitRes = terrno;
61✔
1124
    return;
61✔
1125
  }
1126

1127
#ifdef TAOSD_INTEGRATED
1128
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0), "failed to init cfg");
1129
#else
1130
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
1,607,039✔
1131
#endif
1132

1133
  initQueryModuleMsgHandle();
1,607,039✔
1134
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
1135
  if ((tsCharsetCxt = taosConvInit(tsCharset)) == NULL) {
1,607,039✔
1136
    tscInitRes = terrno;
×
1137
    tscError("failed to init conv");
×
1138
    return;
×
1139
  }
1140
#endif
1141
#if !defined(TD_ASTRA)
1142
  ENV_ERR_RET(tzInit(), "failed to init timezone");
1,607,039✔
1143
#endif
1144
  ENV_ERR_RET(monitorInit(), "failed to init monitor");
1,607,039✔
1145
  ENV_ERR_RET(rpcInit(), "failed to init rpc");
1,607,039✔
1146

1147
  if (InitRegexCache() != 0) {
1,607,039✔
1148
    tscInitRes = terrno;
×
1149
    (void)printf("failed to init regex cache\n");
×
1150
    return;
×
1151
  }
1152

1153
  tscInfo("starting to initialize TAOS driver");
1,607,039✔
1154

1155
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
1,607,039✔
1156
  ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
1,607,039✔
1157
  ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
1,607,039✔
1158
  ENV_ERR_RET(initClientId(), "failed to init clientId");
1,607,039✔
1159

1160
  ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
1,607,039✔
1161
  ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
1,607,039✔
1162
  ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
1,607,039✔
1163

1164
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
1,607,039✔
1165
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
1,607,039✔
1166

1167
  ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
1,607,039✔
1168
  ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
1,607,039✔
1169
#ifdef USE_REPORT
1170
  ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
1,607,039✔
1171
#endif
1172
  ENV_ERR_RET(qInitKeywordsTable(), "failed to init parser keywords table");
1,607,039✔
1173
#ifdef TAOSD_INTEGRATED
1174
  ENV_ERR_RET(shellStartDaemon(0, NULL), "failed to start taosd daemon");
1175
#endif
1176

1177
  if (tsSessionControl) {
1,607,039✔
1178
    ENV_ERR_RET(sessMgtInit(), "failed to init session management");
1,607,039✔
1179
  }
1180

1181
  tscInfo("TAOS driver is initialized successfully");
1,607,039✔
1182
}
1183

1184
int taos_init() {
87,534,877✔
1185
  (void)taosThreadOnce(&tscinit, taos_init_imp);
87,534,877✔
1186
  return tscInitRes;
87,537,495✔
1187
}
1188

1189
const char *getCfgName(TSDB_OPTION option) {
5,232✔
1190
  const char *name = NULL;
5,232✔
1191

1192
  switch (option) {
5,232✔
1193
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
138✔
1194
      name = "shellActivityTimer";
138✔
1195
      break;
138✔
1196
    case TSDB_OPTION_LOCALE:
×
1197
      name = "locale";
×
1198
      break;
×
1199
    case TSDB_OPTION_CHARSET:
×
1200
      name = "charset";
×
1201
      break;
×
1202
    case TSDB_OPTION_TIMEZONE:
4,302✔
1203
      name = "timezone";
4,302✔
1204
      break;
4,302✔
1205
    case TSDB_OPTION_USE_ADAPTER:
198✔
1206
      name = "useAdapter";
198✔
1207
      break;
198✔
1208
    default:
594✔
1209
      break;
594✔
1210
  }
1211

1212
  return name;
5,232✔
1213
}
1214

1215
int taos_options_imp(TSDB_OPTION option, const char *str) {
85,155,086✔
1216
  if (option == TSDB_OPTION_CONFIGDIR) {
85,155,086✔
1217
#ifndef WINDOWS
1218
    char newstr[PATH_MAX];
740,168✔
1219
    int  len = strlen(str);
85,149,854✔
1220
    if (len > 1 && str[0] != '"' && str[0] != '\'') {
85,149,854✔
1221
      if (len + 2 >= PATH_MAX) {
85,148,722✔
1222
        tscError("Too long path %s", str);
×
1223
        return -1;
×
1224
      }
1225
      newstr[0] = '"';
85,148,722✔
1226
      (void)memcpy(newstr + 1, str, len);
85,148,722✔
1227
      newstr[len + 1] = '"';
85,148,722✔
1228
      newstr[len + 2] = '\0';
85,148,722✔
1229
      str = newstr;
85,148,722✔
1230
    }
1231
#endif
1232
    tstrncpy(configDir, str, PATH_MAX);
85,149,854✔
1233
    tscInfo("set cfg:%s to %s", configDir, str);
85,149,854✔
1234
    return 0;
85,149,854✔
1235
  }
1236

1237
  // initialize global config
1238
  if (taos_init() != 0) {
5,232✔
1239
    return -1;
×
1240
  }
1241

1242
  SConfig     *pCfg = taosGetCfg();
5,232✔
1243
  SConfigItem *pItem = NULL;
5,232✔
1244
  const char  *name = getCfgName(option);
5,232✔
1245

1246
  if (name == NULL) {
5,232✔
1247
    tscError("Invalid option %d", option);
594✔
1248
    return -1;
594✔
1249
  }
1250

1251
  pItem = cfgGetItem(pCfg, name);
4,638✔
1252
  if (pItem == NULL) {
4,638✔
1253
    tscError("Invalid option %d", option);
×
1254
    return -1;
×
1255
  }
1256

1257
  int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
4,638✔
1258
  if (code != 0) {
4,638✔
1259
    tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
138✔
1260
  } else {
1261
    tscInfo("set cfg:%s to %s", name, str);
4,500✔
1262
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
4,500✔
1263
      code = taosCfgDynamicOptions(pCfg, name, false);
198✔
1264
    }
1265
  }
1266

1267
  return code;
4,638✔
1268
}
1269

1270
/**
1271
 * The request id is an unsigned integer format of 64bit.
1272
 *+------------+-----+-----------+---------------+
1273
 *| uid|localIp| PId | timestamp | serial number |
1274
 *+------------+-----+-----------+---------------+
1275
 *| 12bit      |12bit|24bit      |16bit          |
1276
 *+------------+-----+-----------+---------------+
1277
 * @return
1278
 */
1279
uint64_t generateRequestId() {
1,667,258,053✔
1280
  static uint32_t hashId = 0;
1281
  static int32_t  requestSerialId = 0;
1282

1283
  if (hashId == 0) {
1,667,258,053✔
1284
    int32_t code = taosGetSystemUUIDU32(&hashId);
1,604,946✔
1285
    if (code != TSDB_CODE_SUCCESS) {
1,604,946✔
1286
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
×
1287
               tstrerror(code));
1288
    }
1289
  }
1290

1291
  uint64_t id = 0;
1,667,246,368✔
1292

1293
  while (true) {
×
1294
    int64_t  ts = taosGetTimestampMs();
1,667,305,201✔
1295
    uint64_t pid = taosGetPId();
1,667,305,201✔
1296
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
1,667,275,072✔
1297
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
1,667,335,357✔
1298

1299
    id = (((uint64_t)(hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
1,667,343,935✔
1300
    if (id) {
1,667,343,935✔
1301
      break;
1,667,343,935✔
1302
    }
1303
  }
1304
  return id;
1,667,343,935✔
1305
}
1306

1307
#if 0
1308
#include "cJSON.h"
1309
static setConfRet taos_set_config_imp(const char *config){
1310
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
1311
  static bool setConfFlag = false;
1312
  if (setConfFlag) {
1313
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
1314
    tstrncpy(ret.retMsg, "configuration can only set once", RET_MSG_LENGTH);
1315
    return ret;
1316
  }
1317
  taosInitGlobalCfg();
1318
  cJSON *root = cJSON_Parse(config);
1319
  if (root == NULL){
1320
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
1321
    tstrncpy(ret.retMsg, "parse json error", RET_MSG_LENGTH);
1322
    return ret;
1323
  }
1324

1325
  int size = cJSON_GetArraySize(root);
1326
  if(!cJSON_IsObject(root) || size == 0) {
1327
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
1328
    tstrncpy(ret.retMsg, "json content is invalid, must be not empty object", RET_MSG_LENGTH);
1329
    return ret;
1330
  }
1331

1332
  if(size >= 1000) {
1333
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
1334
    tstrncpy(ret.retMsg, "json object size is too long", RET_MSG_LENGTH);
1335
    return ret;
1336
  }
1337

1338
  for(int i = 0; i < size; i++){
1339
    cJSON *item = cJSON_GetArrayItem(root, i);
1340
    if(!item) {
1341
      ret.retCode = SET_CONF_RET_ERR_INNER;
1342
      tstrncpy(ret.retMsg, "inner error", RET_MSG_LENGTH);
1343
      return ret;
1344
    }
1345
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
1346
      ret.retCode = SET_CONF_RET_ERR_PART;
1347
      if (strlen(ret.retMsg) == 0){
1348
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
1349
      }else{
1350
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1351
        size_t leftSize = tmp >= 0 ? tmp : 0;
1352
        strncat(ret.retMsg, "|",  leftSize);
1353
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1354
        leftSize = tmp >= 0 ? tmp : 0;
1355
        strncat(ret.retMsg, item->string, leftSize);
1356
      }
1357
    }
1358
  }
1359
  cJSON_Delete(root);
1360
  setConfFlag = true;
1361
  return ret;
1362
}
1363

1364
setConfRet taos_set_config(const char *config){
1365
  taosThreadMutexLock(&setConfMutex);
1366
  setConfRet ret = taos_set_config_imp(config);
1367
  taosThreadMutexUnlock(&setConfMutex);
1368
  return ret;
1369
}
1370
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc