• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3840

04 Apr 2025 03:35PM UTC coverage: 63.027% (+0.6%) from 62.382%
#3840

push

travis-ci

web-flow
Merge pull request #30653 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

155471 of 315065 branches covered (49.35%)

Branch coverage included in aggregate %.

241637 of 314991 relevant lines covered (76.71%)

18825079.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.27
/source/client/src/clientEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ttimer.h>
17
#include "cJSON.h"
18
#include "catalog.h"
19
#include "clientInt.h"
20
#include "clientLog.h"
21
#include "clientMonitor.h"
22
#include "functionMgt.h"
23
#include "os.h"
24
#include "osSleep.h"
25
#include "query.h"
26
#include "qworker.h"
27
#include "scheduler.h"
28
#include "tcache.h"
29
#include "tcompare.h"
30
#include "tconv.h"
31
#include "tglobal.h"
32
#include "thttp.h"
33
#include "tmsg.h"
34
#include "tqueue.h"
35
#include "tref.h"
36
#include "trpc.h"
37
#include "tsched.h"
38
#include "ttime.h"
39
#include "tversion.h"
40

41
#include "cus_name.h"
42

43
#define TSC_VAR_NOT_RELEASE 1
44
#define TSC_VAR_RELEASED    0
45

46
#define ENV_JSON_FALSE_CHECK(c)                     \
47
  do {                                              \
48
    if (!c) {                                       \
49
      tscError("faild to add item to JSON object"); \
50
      code = TSDB_CODE_TSC_FAIL_GENERATE_JSON;      \
51
      goto _end;                                    \
52
    }                                               \
53
  } while (0)
54

55
#define ENV_ERR_RET(c, info)          \
56
  do {                                \
57
    int32_t _code = (c);              \
58
    if (_code != TSDB_CODE_SUCCESS) { \
59
      terrno = _code;                 \
60
      tscInitRes = _code;             \
61
      tscError(info);                 \
62
      return;                         \
63
    }                                 \
64
  } while (0)
65

66
STscDbg   tscDbg = {0};
67
SAppInfo  appInfo;
68
int64_t   lastClusterId = 0;
69
int32_t   clientReqRefPool = -1;
70
int32_t   clientConnRefPool = -1;
71
int32_t   clientStop = -1;
72
SHashObj *pTimezoneMap = NULL;
73

74
int32_t timestampDeltaLimit = 900;  // s
75

76
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
77
volatile int32_t    tscInitRes = 0;
78

79
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
11,033,098✔
80
  int32_t code = TSDB_CODE_SUCCESS;
11,033,098✔
81
  // connection has been released already, abort creating request.
82
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
11,033,098✔
83
  if (pRequest->self < 0) {
11,038,402!
84
    tscError("failed to add ref to request");
×
85
    code = terrno;
×
86
    return code;
×
87
  }
88

89
  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
11,038,402✔
90

91
  if (pTscObj->pAppInfo) {
11,044,264!
92
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
11,044,844✔
93

94
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
11,044,844✔
95
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
11,048,425✔
96
    tscDebug("req:0x%" PRIx64 ", create request from conn:0x%" PRIx64 ", current:%d, app current:%d, total:%d, QID:0x%" PRIx64,
11,046,252✔
97
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
98
  }
99

100
  return code;
11,046,573✔
101
}
102

103
static void concatStrings(SArray *list, char *buf, int size) {
42✔
104
  int len = 0;
42✔
105
  for (int i = 0; i < taosArrayGetSize(list); i++) {
105✔
106
    char *db = taosArrayGet(list, i);
63✔
107
    if (NULL == db) {
63!
108
      tscError("get dbname failed, buf:%s", buf);
×
109
      break;
×
110
    }
111
    char *dot = strchr(db, '.');
63✔
112
    if (dot != NULL) {
63!
113
      db = dot + 1;
63✔
114
    }
115
    if (i != 0) {
63✔
116
      (void)strncat(buf, ",", size - 1 - len);
21✔
117
      len += 1;
21✔
118
    }
119
    int ret = tsnprintf(buf + len, size - len, "%s", db);
63✔
120
    if (ret < 0) {
63!
121
      tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
×
122
      break;
×
123
    }
124
    len += ret;
63✔
125
    if (len >= size) {
63!
126
      tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
×
127
      break;
×
128
    }
129
  }
130
}
42✔
131
#ifdef USE_REPORT
132
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration) {
42✔
133
  cJSON  *json = cJSON_CreateObject();
42✔
134
  int32_t code = TSDB_CODE_SUCCESS;
42✔
135
  if (json == NULL) {
42!
136
    tscError("failed to create monitor json");
×
137
    return TSDB_CODE_OUT_OF_MEMORY;
×
138
  }
139
  char clusterId[32] = {0};
42✔
140
  if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0) {
42!
141
    tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
×
142
    code = TSDB_CODE_FAILED;
×
143
    goto _end;
×
144
  }
145

146
  char startTs[32] = {0};
42✔
147
  if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start / 1000) < 0) {
42!
148
    tscError("failed to generate startTs:%" PRId64, pRequest->metric.start / 1000);
×
149
    code = TSDB_CODE_FAILED;
×
150
    goto _end;
×
151
  }
152

153
  char requestId[32] = {0};
42✔
154
  if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0) {
42!
155
    tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
×
156
    code = TSDB_CODE_FAILED;
×
157
    goto _end;
×
158
  }
159
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
42!
160
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
42!
161
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
42!
162
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration / 1000)));
42!
163
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
42!
164
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
42!
165
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
42!
166
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
42!
167
      json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
168
  if (pRequest->sqlstr != NULL &&
42!
169
      strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
42!
170
    char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
×
171
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
×
172
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
173
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
×
174
  } else {
175
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
42!
176
  }
177

178
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
42!
179
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
42!
180
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
42!
181

182
  char pid[32] = {0};
42✔
183
  if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0) {
42!
184
    tscError("failed to generate pid:%d", appInfo.pid);
×
185
    code = TSDB_CODE_FAILED;
×
186
    goto _end;
×
187
  }
188

189
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
42!
190
  if (pRequest->dbList != NULL) {
42!
191
    char dbList[1024] = {0};
42✔
192
    concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
42✔
193
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
42!
194
  } else if (pRequest->pDb != NULL) {
×
195
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
×
196
  } else {
197
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
×
198
  }
199

200
  char *value = cJSON_PrintUnformatted(json);
42✔
201
  if (value == NULL) {
42!
202
    tscError("failed to print json");
×
203
    code = TSDB_CODE_FAILED;
×
204
    goto _end;
×
205
  }
206
  MonitorSlowLogData data = {0};
42✔
207
  data.clusterId = pTscObj->pAppInfo->clusterId;
42✔
208
  data.type = SLOW_LOG_WRITE;
42✔
209
  data.data = value;
42✔
210
  code = monitorPutData2MonitorQueue(data);
42✔
211
  if (TSDB_CODE_SUCCESS != code) {
42!
212
    taosMemoryFree(value);
×
213
    goto _end;
×
214
  }
215

216
_end:
42✔
217
  cJSON_Delete(json);
42✔
218
  return code;
42✔
219
}
220
#endif
221
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char *exceptDb) {
1,081✔
222
  if (pRequest->pDb != NULL) {
1,081✔
223
    return strcmp(pRequest->pDb, exceptDb) != 0;
812✔
224
  }
225

226
  for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
379✔
227
    char *db = taosArrayGet(pRequest->dbList, i);
110✔
228
    if (NULL == db) {
110!
229
      tscError("get dbname failed, exceptDb:%s", exceptDb);
×
230
      return false;
×
231
    }
232
    char *dot = strchr(db, '.');
110✔
233
    if (dot != NULL) {
110!
234
      db = dot + 1;
110✔
235
    }
236
    if (strcmp(db, exceptDb) == 0) {
110!
237
      return false;
×
238
    }
239
  }
240
  return true;
269✔
241
}
242

243
static void deregisterRequest(SRequestObj *pRequest) {
11,022,412✔
244
  if (pRequest == NULL) {
11,022,412!
245
    tscError("pRequest == NULL");
×
246
    return;
×
247
  }
248

249
  STscObj            *pTscObj = pRequest->pTscObj;
11,022,412✔
250
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
11,022,412✔
251

252
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
11,022,412✔
253
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
11,042,593✔
254
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
11,041,381✔
255

256
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
11,026,354✔
257
  tscDebug("req:0x%" PRIx64 ", free from conn:0x%" PRIx64 ", QID:0x%" PRIx64
11,026,354✔
258
           ", elapsed:%.2f ms, current:%d, app current:%d",
259
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
260

261
  if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
11,026,354!
262
    if ((pRequest->pQuery && pRequest->pQuery->pRoot && QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
11,025,772!
263
         (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
9,686,503✔
264
        QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
1,416,074✔
265
      tscDebug("req:0x%" PRIx64 ", insert duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
9,721,414✔
266
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
267
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
268
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
269
               pRequest->requestId);
270
      (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
9,721,414✔
271
      reqType = SLOW_LOG_TYPE_INSERT;
9,737,042✔
272
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
1,304,358✔
273
      tscDebug("req:0x%" PRIx64 ", query duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
927,923✔
274
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
275
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
276
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
277
               pRequest->requestId);
278

279
      (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
927,923✔
280
      reqType = SLOW_LOG_TYPE_QUERY;
927,922✔
281
    }
282

283
    if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
11,041,399!
284
      tscError("failed to release allocator");
×
285
    }
286
  }
287

288
#ifdef USE_REPORT
289
  if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
11,033,462✔
290
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
8,730,864✔
291
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
8,381,305✔
292
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
349,559✔
293
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
227,275✔
294
    } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
122,284✔
295
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
101,779✔
296
    }
297
  }
298

299
  if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
11,051,359!
300
      checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
1,081✔
301
    (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
1,081✔
302
    if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
1,081✔
303
      taosPrintSlowLog("PID:%d, connId:%u, QID:0x%" PRIx64 ", Start:%" PRId64 "us, Duration:%" PRId64 "us, SQL:%s",
291✔
304
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
305
                       pRequest->sqlstr);
306
      if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
291✔
307
        slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
42✔
308
        if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
42!
309
          tscError("failed to generate write slow log");
×
310
        }
311
      }
312
    }
313
  }
314
#endif
315

316
  releaseTscObj(pTscObj->id);
11,050,278✔
317
}
318

319
// todo close the transporter properly
320
void closeTransporter(SAppInstInfo *pAppInfo) {
16,840✔
321
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
16,840!
322
    return;
×
323
  }
324

325
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
16,840✔
326
  rpcClose(pAppInfo->pTransporter);
16,840✔
327
}
328

329
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
1,116,175✔
330
  if (NEED_REDIRECT_ERROR(code)) {
1,116,175!
331
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
94,893!
332
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK ||
95,228!
333
        msgType == TDMT_SCH_TASK_NOTIFY) {
334
      return false;
×
335
    }
336
    return true;
95,228✔
337
  } else {
338
    return false;
1,021,282✔
339
  }
340
}
341

342
// start timer for particular msgType
343
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
×
344
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
×
345
    return true;
×
346
  }
347
  return false;
×
348
}
349

350
// TODO refactor
351
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
16,840✔
352
  SRpcInit rpcInit;
353
  (void)memset(&rpcInit, 0, sizeof(rpcInit));
16,840✔
354
  rpcInit.localPort = 0;
16,840✔
355
  rpcInit.label = "TSC";
16,840✔
356
  rpcInit.numOfThreads = tsNumOfRpcThreads;
16,840✔
357
  rpcInit.cfp = processMsgFromServer;
16,840✔
358
  rpcInit.rfp = clientRpcRfp;
16,840✔
359
  rpcInit.sessions = 1024;
16,840✔
360
  rpcInit.connType = TAOS_CONN_CLIENT;
16,840✔
361
  rpcInit.user = (char *)user;
16,840✔
362
  rpcInit.idleTime = tsShellActivityTimer * 1000;
16,840✔
363
  rpcInit.compressSize = tsCompressMsgSize;
16,840✔
364
  rpcInit.dfp = destroyAhandle;
16,840✔
365

366
  rpcInit.retryMinInterval = tsRedirectPeriod;
16,840✔
367
  rpcInit.retryStepFactor = tsRedirectFactor;
16,840✔
368
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
16,840✔
369
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
16,840✔
370

371
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
16,840✔
372
  connLimitNum = TMAX(connLimitNum, 10);
16,840✔
373
  connLimitNum = TMIN(connLimitNum, 1000);
16,840✔
374
  rpcInit.connLimitNum = connLimitNum;
16,840✔
375
  rpcInit.shareConnLimit = tsShareConnLimit;
16,840✔
376
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
16,840✔
377
  rpcInit.startReadTimer = 1;
16,840✔
378
  rpcInit.readTimeout = tsReadTimeout;
16,840✔
379

380
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
16,840✔
381
  if (TSDB_CODE_SUCCESS != code) {
16,840!
382
    tscError("invalid version string.");
×
383
    return code;
×
384
  }
385

386
  *pDnodeConn = rpcOpen(&rpcInit);
16,840✔
387
  if (*pDnodeConn == NULL) {
16,840!
388
    tscError("failed to init connection to server since %s", tstrerror(terrno));
×
389
    code = terrno;
×
390
  }
391

392
  return code;
16,840✔
393
}
394

395
void destroyAllRequests(SHashObj *pRequests) {
29,893✔
396
  void *pIter = taosHashIterate(pRequests, NULL);
29,893✔
397
  while (pIter != NULL) {
29,893!
398
    int64_t *rid = pIter;
×
399

400
    SRequestObj *pRequest = acquireRequest(*rid);
×
401
    if (pRequest) {
×
402
      destroyRequest(pRequest);
×
403
      (void)releaseRequest(*rid);  // ignore error
×
404
    }
405

406
    pIter = taosHashIterate(pRequests, pIter);
×
407
  }
408
}
29,893✔
409

410
void stopAllRequests(SHashObj *pRequests) {
5✔
411
  void *pIter = taosHashIterate(pRequests, NULL);
5✔
412
  while (pIter != NULL) {
5!
413
    int64_t *rid = pIter;
×
414

415
    SRequestObj *pRequest = acquireRequest(*rid);
×
416
    if (pRequest) {
×
417
      taos_stop_query(pRequest);
×
418
      (void)releaseRequest(*rid);  // ignore error
×
419
    }
420

421
    pIter = taosHashIterate(pRequests, pIter);
×
422
  }
423
}
5✔
424

425
void destroyAppInst(void *info) {
16,840✔
426
  SAppInstInfo *pAppInfo = *(SAppInstInfo **)info;
16,840✔
427
  tscDebug("destroy app inst mgr %p", pAppInfo);
16,840✔
428

429
  int32_t code = taosThreadMutexLock(&appInfo.mutex);
16,840✔
430
  if (TSDB_CODE_SUCCESS != code) {
16,840!
431
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
432
  }
433

434
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
16,840✔
435

436
  code = taosThreadMutexUnlock(&appInfo.mutex);
16,840✔
437
  if (TSDB_CODE_SUCCESS != code) {
16,840!
438
    tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
439
  }
440

441
  taosMemoryFreeClear(pAppInfo->instKey);
16,840!
442
  closeTransporter(pAppInfo);
16,840✔
443

444
  code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
16,840✔
445
  if (TSDB_CODE_SUCCESS != code) {
16,840!
446
    tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
447
  }
448

449
  taosArrayDestroy(pAppInfo->pQnodeList);
16,840✔
450
  code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
16,840✔
451
  if (TSDB_CODE_SUCCESS != code) {
16,840!
452
    tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
453
  }
454

455
  taosMemoryFree(pAppInfo);
16,840!
456
}
16,840✔
457

458
void destroyTscObj(void *pObj) {
29,891✔
459
  if (NULL == pObj) {
29,891!
460
    return;
×
461
  }
462

463
  STscObj *pTscObj = pObj;
29,891✔
464
  int64_t  tscId = pTscObj->id;
29,891✔
465
  tscTrace("conn:%" PRIx64 ", begin destroy, p:%p", tscId, pTscObj);
29,891✔
466

467
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
29,891✔
468
  hbDeregisterConn(pTscObj, connKey);
29,891✔
469

470
  destroyAllRequests(pTscObj->pRequests);
29,893✔
471
  taosHashCleanup(pTscObj->pRequests);
29,893✔
472

473
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
29,893✔
474
  tscDebug("conn:0x%" PRIx64 ", p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
29,893✔
475
           pTscObj->pAppInfo->numOfConns);
476

477
  // In any cases, we should not free app inst here. Or an race condition rises.
478
  /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
29,893✔
479

480
  (void)taosThreadMutexDestroy(&pTscObj->mutex);
29,893✔
481
  taosMemoryFree(pTscObj);
29,893!
482

483
  tscTrace("conn:0x%" PRIx64 ", end destroy, p:%p", tscId, pTscObj);
29,893✔
484
}
485

486
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
30,913✔
487
                     STscObj **pObj) {
488
  *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
30,913!
489
  if (NULL == *pObj) {
30,913!
490
    return terrno;
×
491
  }
492

493
  (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
30,913✔
494
  if (NULL == (*pObj)->pRequests) {
30,913!
495
    taosMemoryFree(*pObj);
×
496
    return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
497
  }
498

499
  (*pObj)->connType = connType;
30,913✔
500
  (*pObj)->pAppInfo = pAppInfo;
30,913✔
501
  (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
30,913✔
502
  tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
30,913✔
503
  (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
30,913✔
504

505
  if (db != NULL) {
30,913!
506
    tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
30,913✔
507
  }
508

509
  TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
30,913!
510

511
  int32_t code = TSDB_CODE_SUCCESS;
30,913✔
512

513
  (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
30,913✔
514
  if ((*pObj)->id < 0) {
30,913!
515
    tscError("failed to add object to clientConnRefPool");
×
516
    code = terrno;
×
517
    taosMemoryFree(*pObj);
×
518
    return code;
×
519
  }
520

521
  (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
30,913✔
522

523
  tscInfo("conn:0x%" PRIx64 ", created, p:%p", (*pObj)->id, *pObj);
30,913!
524
  return code;
30,912✔
525
}
526

527
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
20,249,246✔
528

529
void releaseTscObj(int64_t rid) {
20,303,107✔
530
  int32_t code = taosReleaseRef(clientConnRefPool, rid);
20,303,107✔
531
  if (TSDB_CODE_SUCCESS != code) {
20,303,516!
532
    tscWarn("failed to release TscObj, code:%s", tstrerror(code));
×
533
  }
534
}
20,303,516✔
535

536
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
11,028,281✔
537
  int32_t code = TSDB_CODE_SUCCESS;
11,028,281✔
538
  *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
11,028,281!
539
  if (NULL == *pRequest) {
11,037,576!
540
    return terrno;
×
541
  }
542

543
  STscObj *pTscObj = acquireTscObj(connId);
11,037,576✔
544
  if (pTscObj == NULL) {
11,041,584!
545
    TSC_ERR_JRET(TSDB_CODE_TSC_DISCONNECTED);
×
546
  }
547
  SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
11,041,584!
548
  if (interParam == NULL) {
11,039,746!
549
    releaseTscObj(connId);
×
550
    TSC_ERR_JRET(terrno);
×
551
  }
552
  TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
11,039,746!
553
  interParam->pRequest = *pRequest;
11,038,476✔
554
  (*pRequest)->body.interParam = interParam;
11,038,476✔
555

556
  (*pRequest)->resType = RES_TYPE__QUERY;
11,038,476✔
557
  (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
11,038,476!
558
  (*pRequest)->metric.start = taosGetTimestampUs();
11,044,797✔
559

560
  (*pRequest)->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
11,040,523✔
561
  (*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt;
11,040,523✔
562
  (*pRequest)->type = type;
11,040,523✔
563
  (*pRequest)->allocatorRefId = -1;
11,040,523✔
564

565
  (*pRequest)->pDb = getDbOfConnection(pTscObj);
11,040,523✔
566
  if (NULL == (*pRequest)->pDb) {
11,044,276✔
567
    TSC_ERR_JRET(terrno);
347,441!
568
  }
569
  (*pRequest)->pTscObj = pTscObj;
11,044,276✔
570
  (*pRequest)->inCallback = false;
11,044,276✔
571
  (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
11,044,276!
572
  if (NULL == (*pRequest)->msgBuf) {
11,034,388!
573
    code = terrno;
×
574
    goto _return;
×
575
  }
576
  (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
11,034,388✔
577
  TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
11,034,388!
578
  TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
11,035,944!
579

580
  return TSDB_CODE_SUCCESS;
11,041,740✔
581
_return:
×
582
  if ((*pRequest)->pTscObj) {
×
583
    doDestroyRequest(*pRequest);
×
584
  } else {
585
    taosMemoryFree(*pRequest);
×
586
  }
587
  return code;
×
588
}
589

590
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
11,446,467✔
591
  taosMemoryFreeClear(pResInfo->pRspMsg);
11,446,467!
592
  taosMemoryFreeClear(pResInfo->length);
11,446,480!
593
  taosMemoryFreeClear(pResInfo->row);
11,446,479!
594
  taosMemoryFreeClear(pResInfo->pCol);
11,446,482!
595
  taosMemoryFreeClear(pResInfo->fields);
11,446,486!
596
  taosMemoryFreeClear(pResInfo->userFields);
11,446,488!
597
  taosMemoryFreeClear(pResInfo->convertJson);
11,446,483!
598
  taosMemoryFreeClear(pResInfo->decompBuf);
11,446,483!
599

600
  if (pResInfo->convertBuf != NULL) {
11,446,483✔
601
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
4,923,570✔
602
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
3,693,476!
603
    }
604
    taosMemoryFreeClear(pResInfo->convertBuf);
1,230,094!
605
  }
606
}
11,446,493✔
607

608
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
47,839,960✔
609

610
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
48,539,489✔
611

612
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
11,013,938✔
613

614
/// return the most previous req ref id
615
int64_t removeFromMostPrevReq(SRequestObj *pRequest) {
11,017,246✔
616
  int64_t      mostPrevReqRefId = pRequest->self;
11,017,246✔
617
  SRequestObj *pTmp = pRequest;
11,017,246✔
618
  while (pTmp->relation.prevRefId) {
11,017,731✔
619
    pTmp = acquireRequest(pTmp->relation.prevRefId);
485✔
620
    if (pTmp) {
485!
621
      mostPrevReqRefId = pTmp->self;
485✔
622
      (void)releaseRequest(mostPrevReqRefId);  // ignore error
485✔
623
    } else {
624
      break;
×
625
    }
626
  }
627
  (void)removeRequest(mostPrevReqRefId);  // ignore error
11,017,246✔
628
  return mostPrevReqRefId;
11,033,834✔
629
}
630

631
void destroyNextReq(int64_t nextRefId) {
11,043,442✔
632
  if (nextRefId) {
11,043,442✔
633
    SRequestObj *pObj = acquireRequest(nextRefId);
509✔
634
    if (pObj) {
509!
635
      (void)releaseRequest(nextRefId);  // ignore error
509✔
636
      (void)releaseRequest(nextRefId);  // ignore error
509✔
637
    }
638
  }
639
}
11,043,442✔
640

641
void destroySubRequests(SRequestObj *pRequest) {
×
642
  int32_t      reqIdx = -1;
×
643
  SRequestObj *pReqList[16] = {NULL};
×
644
  uint64_t     tmpRefId = 0;
×
645

646
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
×
647
    return;
×
648
  }
649

650
  SRequestObj *pTmp = pRequest;
×
651
  while (pTmp->relation.prevRefId) {
×
652
    tmpRefId = pTmp->relation.prevRefId;
×
653
    pTmp = acquireRequest(tmpRefId);
×
654
    if (pTmp) {
×
655
      pReqList[++reqIdx] = pTmp;
×
656
      (void)releaseRequest(tmpRefId);  // ignore error
×
657
    } else {
658
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
659
      break;
×
660
    }
661
  }
662

663
  for (int32_t i = reqIdx; i >= 0; i--) {
×
664
    (void)removeRequest(pReqList[i]->self);  // ignore error
×
665
  }
666

667
  tmpRefId = pRequest->relation.nextRefId;
×
668
  while (tmpRefId) {
×
669
    pTmp = acquireRequest(tmpRefId);
×
670
    if (pTmp) {
×
671
      tmpRefId = pTmp->relation.nextRefId;
×
672
      (void)removeRequest(pTmp->self);   // ignore error
×
673
      (void)releaseRequest(pTmp->self);  // ignore error
×
674
    } else {
675
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
676
      break;
×
677
    }
678
  }
679
}
680

681
void doDestroyRequest(void *p) {
11,035,064✔
682
  if (NULL == p) {
11,035,064!
683
    return;
×
684
  }
685

686
  SRequestObj *pRequest = (SRequestObj *)p;
11,035,064✔
687

688
  uint64_t reqId = pRequest->requestId;
11,035,064✔
689
  tscTrace("QID:0x%" PRIx64 ", begin destroy request, res:%p", reqId, pRequest);
11,035,064✔
690

691
  int64_t nextReqRefId = pRequest->relation.nextRefId;
11,035,064✔
692

693
  int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
11,035,064✔
694
  if (TSDB_CODE_SUCCESS != code) {
11,023,869✔
695
    tscDebug("failed to remove request from hash since %s", tstrerror(code));
30,913✔
696
  }
697
  schedulerFreeJob(&pRequest->body.queryJob, 0);
11,023,869✔
698

699
  destorySqlCallbackWrapper(pRequest->pWrapper);
11,020,494✔
700

701
  taosMemoryFreeClear(pRequest->msgBuf);
11,019,940!
702

703
  doFreeReqResultInfo(&pRequest->body.resInfo);
11,024,678✔
704
  if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
11,020,653!
705
    tscError("failed to destroy semaphore");
×
706
  }
707

708
  taosArrayDestroy(pRequest->tableList);
11,013,999✔
709
  taosArrayDestroy(pRequest->targetTableList);
11,007,488✔
710
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
11,006,075✔
711

712
  if (pRequest->self) {
11,034,648!
713
    deregisterRequest(pRequest);
11,035,203✔
714
  }
715

716
  taosMemoryFreeClear(pRequest->pDb);
11,048,637!
717
  taosArrayDestroy(pRequest->dbList);
11,048,609✔
718
  if (pRequest->body.interParam) {
11,049,226!
719
    if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
11,049,243!
720
      tscError("failed to destroy semaphore in pRequest");
×
721
    }
722
  }
723
  taosMemoryFree(pRequest->body.interParam);
11,049,214!
724

725
  qDestroyQuery(pRequest->pQuery);
11,049,270✔
726
  nodesDestroyAllocator(pRequest->allocatorRefId);
11,043,038✔
727

728
  taosMemoryFreeClear(pRequest->effectiveUser);
11,039,307!
729
  taosMemoryFreeClear(pRequest->sqlstr);
11,039,307!
730
  taosMemoryFree(pRequest);
11,044,659!
731
  tscTrace("QID:0x%" PRIx64 ", end destroy request, res:%p", reqId, pRequest);
11,045,741✔
732
  destroyNextReq(nextReqRefId);
11,045,741✔
733
}
734

735
void destroyRequest(SRequestObj *pRequest) {
11,022,054✔
736
  if (pRequest == NULL) return;
11,022,054!
737

738
  taos_stop_query(pRequest);
11,022,054✔
739
  (void)removeFromMostPrevReq(pRequest);
11,024,656✔
740
}
741

742
void taosStopQueryImpl(SRequestObj *pRequest) {
11,028,084✔
743
  pRequest->killed = true;
11,028,084✔
744

745
  // It is not a query, no need to stop.
746
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
11,028,084✔
747
    tscDebug("QID:0x%" PRIx64 ", no need to be killed since not query", pRequest->requestId);
330,655✔
748
    return;
343,274✔
749
  }
750

751
  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
10,697,429✔
752
  tscDebug("QID:0x%" PRIx64 ", killed", pRequest->requestId);
10,683,120✔
753
}
754

755
void stopAllQueries(SRequestObj *pRequest) {
11,022,346✔
756
  int32_t      reqIdx = -1;
11,022,346✔
757
  SRequestObj *pReqList[16] = {NULL};
11,022,346✔
758
  uint64_t     tmpRefId = 0;
11,022,346✔
759

760
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
11,022,346!
761
    return;
×
762
  }
763

764
  SRequestObj *pTmp = pRequest;
11,022,346✔
765
  while (pTmp->relation.prevRefId) {
11,022,831✔
766
    tmpRefId = pTmp->relation.prevRefId;
485✔
767
    pTmp = acquireRequest(tmpRefId);
485✔
768
    if (pTmp) {
485!
769
      pReqList[++reqIdx] = pTmp;
485✔
770
    } else {
771
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
772
      break;
×
773
    }
774
  }
775

776
  for (int32_t i = reqIdx; i >= 0; i--) {
11,031,695✔
777
    taosStopQueryImpl(pReqList[i]);
485✔
778
    (void)releaseRequest(pReqList[i]->self);  // ignore error
485✔
779
  }
780

781
  taosStopQueryImpl(pRequest);
11,031,210✔
782

783
  tmpRefId = pRequest->relation.nextRefId;
11,026,132✔
784
  while (tmpRefId) {
11,023,656!
785
    pTmp = acquireRequest(tmpRefId);
×
786
    if (pTmp) {
×
787
      tmpRefId = pTmp->relation.nextRefId;
×
788
      taosStopQueryImpl(pTmp);
×
789
      (void)releaseRequest(pTmp->self);  // ignore error
×
790
    } else {
791
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
792
      break;
×
793
    }
794
  }
795
}
796
#ifdef USE_REPORT
797
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
×
798

799
static void *tscCrashReportThreadFp(void *param) {
×
800
  int32_t code = 0;
×
801
  setThreadName("client-crashReport");
×
802
  char filepath[PATH_MAX] = {0};
×
803
  (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
×
804
  char     *pMsg = NULL;
×
805
  int64_t   msgLen = 0;
×
806
  TdFilePtr pFile = NULL;
×
807
  bool      truncateFile = false;
×
808
  int32_t   sleepTime = 200;
×
809
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
810
  int32_t   loopTimes = reportPeriodNum;
×
811

812
#ifdef WINDOWS
813
  if (taosCheckCurrentInDll()) {
814
    atexit(crashReportThreadFuncUnexpectedStopped);
815
  }
816
#endif
817

818
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
×
819
    return NULL;
×
820
  }
821
  STelemAddrMgmt mgt;
822
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
823
  if (code) {
×
824
    tscError("failed to init telemetry management, code:%s", tstrerror(code));
×
825
    return NULL;
×
826
  }
827

828
  code = initCrashLogWriter();
×
829
  if (code) {
×
830
    tscError("failed to init crash log writer, code:%s", tstrerror(code));
×
831
    return NULL;
×
832
  }
833

834
  while (1) {
835
    checkAndPrepareCrashInfo();
×
836
    if (clientStop > 0 && reportThreadSetQuit()) break;
×
837
    if (loopTimes++ < reportPeriodNum) {
×
838
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
839
      taosMsleep(sleepTime);
×
840
      continue;
×
841
    }
842

843
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
844
    if (pMsg && msgLen > 0) {
×
845
      if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
846
        tscError("failed to send crash report");
×
847
        if (pFile) {
×
848
          taosReleaseCrashLogFile(pFile, false);
×
849
          pFile = NULL;
×
850

851
          taosMsleep(sleepTime);
×
852
          loopTimes = 0;
×
853
          continue;
×
854
        }
855
      } else {
856
        tscInfo("succeed to send crash report");
×
857
        truncateFile = true;
×
858
      }
859
    } else {
860
      tscInfo("no crash info was found");
×
861
    }
862

863
    taosMemoryFree(pMsg);
×
864

865
    if (pMsg && msgLen > 0) {
×
866
      pMsg = NULL;
×
867
      continue;
×
868
    }
869

870
    if (pFile) {
×
871
      taosReleaseCrashLogFile(pFile, truncateFile);
×
872
      pFile = NULL;
×
873
      truncateFile = false;
×
874
    }
875

876
    taosMsleep(sleepTime);
×
877
    loopTimes = 0;
×
878
  }
879
  taosTelemetryDestroy(&mgt);
×
880

881
  clientStop = -2;
×
882
  return NULL;
×
883
}
884

885
int32_t tscCrashReportInit() {
16,580✔
886
  if (!tsEnableCrashReport) {
16,580!
887
    return TSDB_CODE_SUCCESS;
16,580✔
888
  }
889
  int32_t      code = TSDB_CODE_SUCCESS;
×
890
  TdThreadAttr thAttr;
891
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
×
892
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
×
893
  TdThread crashReportThread;
894
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
×
895
    tscError("failed to create crashReport thread since %s", strerror(ERRNO));
×
896
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
897
    TSC_ERR_RET(terrno);
×
898
  }
899

900
  (void)taosThreadAttrDestroy(&thAttr);
×
901
_return:
×
902
  if (code) {
×
903
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
904
    TSC_ERR_RET(terrno);
×
905
  }
906

907
  return code;
×
908
}
909

910
void tscStopCrashReport() {
16,581✔
911
  if (!tsEnableCrashReport) {
16,581!
912
    return;
16,581✔
913
  }
914

915
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
×
916
    tscDebug("crash report thread already stopped");
×
917
    return;
×
918
  }
919

920
  while (atomic_load_32(&clientStop) > 0) {
×
921
    taosMsleep(100);
×
922
  }
923
}
924

925
void taos_write_crashinfo(int signum, void *sigInfo, void *context) {
×
926
  writeCrashLogToFile(signum, sigInfo, CUS_PROMPT, lastClusterId, appInfo.startTime);
×
927
}
×
928
#endif
929

930
#ifdef TAOSD_INTEGRATED
931
typedef struct {
932
  TdThread pid;
933
  int32_t  stat;  // < 0: start failed, 0: init(not start), 1: start successfully
934
} SDaemonObj;
935

936
extern int  dmStartDaemon(int argc, char const *argv[]);
937
extern void dmStopDaemon();
938

939
SDaemonObj daemonObj = {0};
940

941
typedef struct {
942
  int32_t argc;
943
  char  **argv;
944
} SExecArgs;
945

946
static void *dmStartDaemonFunc(void *param) {
947
  int32_t    code = 0;
948
  SExecArgs *pArgs = (SExecArgs *)param;
949
  int32_t    argc = pArgs->argc;
950
  char     **argv = pArgs->argv;
951

952
  code = dmStartDaemon(argc, (const char **)argv);
953
  if (code != 0) {
954
    printf("failed to start taosd since %s\r\n", tstrerror(code));
955
    goto _exit;
956
  }
957

958
_exit:
959
  if (code != 0) {
960
    atomic_store_32(&daemonObj.stat, code);
961
  }
962
  return NULL;
963
}
964

965
static int32_t shellStartDaemon(int argc, char *argv[]) {
966
  int32_t    code = 0, lino = 0;
967
  SExecArgs *pArgs = NULL;
968
  int64_t    startMs = taosGetTimestampMs(), endMs = startMs;
969

970
  TdThreadAttr thAttr;
971
  (void)taosThreadAttrInit(&thAttr);
972
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
973
#ifdef TD_COMPACT_OS
974
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
975
#endif
976
  pArgs = (SExecArgs *)taosMemoryCalloc(1, sizeof(SExecArgs));
977
  if (pArgs == NULL) {
978
    code = terrno;
979
    TAOS_CHECK_EXIT(code);
980
  }
981
  pArgs->argc = argc;
982
  pArgs->argv = argv;
983

984
#ifndef TD_AS_LIB
985
  tsLogEmbedded = 1;
986
#endif
987

988
  TAOS_CHECK_EXIT(taosThreadCreate(&daemonObj.pid, &thAttr, dmStartDaemonFunc, pArgs));
989

990
  while (true) {
991
    if (atomic_load_64(&tsDndStart)) {
992
      atomic_store_32(&daemonObj.stat, 1);
993
      break;
994
    }
995
    int32_t daemonstat = atomic_load_32(&daemonObj.stat);
996
    if (daemonstat < 0) {
997
      code = daemonstat;
998
      TAOS_CHECK_EXIT(code);
999
    }
1000

1001
    if (daemonstat > 1) {
1002
      code = TSDB_CODE_APP_ERROR;
1003
      TAOS_CHECK_EXIT(code);
1004
    }
1005
    taosMsleep(1000);
1006
  }
1007

1008
_exit:
1009
  endMs = taosGetTimestampMs();
1010
  (void)taosThreadAttrDestroy(&thAttr);
1011
  taosMemoryFreeClear(pArgs);
1012
  if (code) {
1013
    printf("\r\n The daemon start failed at line %d since %s, cost %" PRIi64 " ms\r\n", lino, tstrerror(code),
1014
           endMs - startMs);
1015
  } else {
1016
    printf("\r\n The daemon started successfully, cost %" PRIi64 " ms\r\n", endMs - startMs);
1017
  }
1018
#ifndef TD_AS_LIB
1019
  tsLogEmbedded = 0;
1020
#endif
1021
  return code;
1022
}
1023

1024
void shellStopDaemon() {
1025
#ifndef TD_AS_LIB
1026
  tsLogEmbedded = 1;
1027
#endif
1028
  dmStopDaemon();
1029
  if (taosCheckPthreadValid(daemonObj.pid)) {
1030
    (void)taosThreadJoin(daemonObj.pid, NULL);
1031
    taosThreadClear(&daemonObj.pid);
1032
  }
1033
}
1034
#endif
1035

1036
void taos_init_imp(void) {
16,581✔
1037
#if defined(LINUX)
1038
  if (tscDbg.memEnable) {
16,581!
1039
    int32_t code = taosMemoryDbgInit();
×
1040
    if (code) {
×
1041
      (void)printf("failed to init memory dbg, error:%s\n", tstrerror(code));
×
1042
    } else {
1043
      tsAsyncLog = false;
×
1044
      (void)printf("memory dbg enabled\n");
×
1045
    }
1046
  }
1047
#endif
1048

1049
  // In the APIs of other program language, taos_cleanup is not available yet.
1050
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
1051
  (void)atexit(taos_cleanup);
16,581✔
1052
  SET_ERRNO(TSDB_CODE_SUCCESS);
16,581✔
1053
  terrno = TSDB_CODE_SUCCESS;
16,581✔
1054
  taosSeedRand(taosGetTimestampSec());
16,581✔
1055

1056
  appInfo.pid = taosGetPId();
16,581✔
1057
  appInfo.startTime = taosGetTimestampMs();
16,581✔
1058
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
16,581✔
1059
  appInfo.pInstMapByClusterId =
16,581✔
1060
      taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
16,581✔
1061
  if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
16,581!
1062
    (void)printf("failed to allocate memory when init appInfo\n");
×
1063
    tscInitRes = terrno;
×
1064
    return;
1✔
1065
  }
1066
  taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
16,581✔
1067

1068
  const char *logName = CUS_PROMPT "log";
16,581✔
1069
  ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
16,581!
1070
  if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
16,581✔
1071
    (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(ERRNO), configDir);
1✔
1072
    tscInitRes = terrno;
1✔
1073
    return;
1✔
1074
  }
1075

1076
#ifdef TAOSD_INTEGRATED
1077
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0), "failed to init cfg");
1078
#else
1079
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
16,580!
1080
#endif
1081

1082
  initQueryModuleMsgHandle();
16,580✔
1083
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
1084
  if ((tsCharsetCxt = taosConvInit(tsCharset)) == NULL) {
16,580!
1085
    tscInitRes = terrno;
×
1086
    tscError("failed to init conv");
×
1087
    return;
×
1088
  }
1089
#endif
1090
#if !defined(WINDOWS) && !defined(TD_ASTRA)
1091
  ENV_ERR_RET(tzInit(), "failed to init timezone");
16,580!
1092
#endif
1093
  ENV_ERR_RET(monitorInit(), "failed to init monitor");
16,580!
1094
  ENV_ERR_RET(rpcInit(), "failed to init rpc");
16,580!
1095

1096
  if (InitRegexCache() != 0) {
16,580!
1097
    tscInitRes = terrno;
×
1098
    (void)printf("failed to init regex cache\n");
×
1099
    return;
×
1100
  }
1101

1102
  tscInfo("starting to initialize TAOS driver");
16,580!
1103

1104
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
16,580✔
1105
  ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
16,580!
1106
  ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
16,580!
1107
  ENV_ERR_RET(initClientId(), "failed to init clientId");
16,580!
1108

1109
  ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
16,580!
1110
  ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
16,580!
1111
  ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
16,580!
1112

1113
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
16,580✔
1114
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
16,580✔
1115

1116
  ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
16,580!
1117
  ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
16,580!
1118
#ifdef USE_REPORT  
1119
  ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
16,580!
1120
#endif
1121
  ENV_ERR_RET(qInitKeywordsTable(), "failed to init parser keywords table");
16,580!
1122
#ifdef TAOSD_INTEGRATED
1123
  ENV_ERR_RET(shellStartDaemon(0, NULL), "failed to start taosd daemon");
1124
#endif
1125
  tscInfo("TAOS driver is initialized successfully");
16,580!
1126
}
1127

1128
int taos_init() {
20,878✔
1129
  (void)taosThreadOnce(&tscinit, taos_init_imp);
20,878✔
1130
  return tscInitRes;
20,877✔
1131
}
1132

1133
const char *getCfgName(TSDB_OPTION option) {
542✔
1134
  const char *name = NULL;
542✔
1135

1136
  switch (option) {
542!
1137
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
×
1138
      name = "shellActivityTimer";
×
1139
      break;
×
1140
    case TSDB_OPTION_LOCALE:
×
1141
      name = "locale";
×
1142
      break;
×
1143
    case TSDB_OPTION_CHARSET:
×
1144
      name = "charset";
×
1145
      break;
×
1146
    case TSDB_OPTION_TIMEZONE:
19✔
1147
      name = "timezone";
19✔
1148
      break;
19✔
1149
    case TSDB_OPTION_USE_ADAPTER:
×
1150
      name = "useAdapter";
×
1151
      break;
×
1152
    default:
523✔
1153
      break;
523✔
1154
  }
1155

1156
  return name;
542✔
1157
}
1158

1159
int taos_options_imp(TSDB_OPTION option, const char *str) {
2,752✔
1160
  if (option == TSDB_OPTION_CONFIGDIR) {
2,752✔
1161
#ifndef WINDOWS
1162
    char newstr[PATH_MAX];
1163
    int  len = strlen(str);
2,210✔
1164
    if (len > 1 && str[0] != '"' && str[0] != '\'') {
2,210!
1165
      if (len + 2 >= PATH_MAX) {
2,205!
1166
        tscError("Too long path %s", str);
×
1167
        return -1;
×
1168
      }
1169
      newstr[0] = '"';
2,205✔
1170
      (void)memcpy(newstr + 1, str, len);
2,205✔
1171
      newstr[len + 1] = '"';
2,205✔
1172
      newstr[len + 2] = '\0';
2,205✔
1173
      str = newstr;
2,205✔
1174
    }
1175
#endif
1176
    tstrncpy(configDir, str, PATH_MAX);
2,210✔
1177
    tscInfo("set cfg:%s to %s", configDir, str);
2,210!
1178
    return 0;
2,210✔
1179
  }
1180

1181
  // initialize global config
1182
  if (taos_init() != 0) {
542!
1183
    return -1;
×
1184
  }
1185

1186
  SConfig     *pCfg = taosGetCfg();
542✔
1187
  SConfigItem *pItem = NULL;
542✔
1188
  const char  *name = getCfgName(option);
542✔
1189

1190
  if (name == NULL) {
542✔
1191
    tscError("Invalid option %d", option);
523!
1192
    return -1;
523✔
1193
  }
1194

1195
  pItem = cfgGetItem(pCfg, name);
19✔
1196
  if (pItem == NULL) {
19!
1197
    tscError("Invalid option %d", option);
×
1198
    return -1;
×
1199
  }
1200

1201
  int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
19✔
1202
  if (code != 0) {
19!
1203
    tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
×
1204
  } else {
1205
    tscInfo("set cfg:%s to %s", name, str);
19!
1206
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
19!
1207
      code = taosCfgDynamicOptions(pCfg, name, false);
×
1208
    }
1209
  }
1210

1211
  return code;
19✔
1212
}
1213

1214
/**
1215
 * The request id is an unsigned integer format of 64bit.
1216
 *+------------+-----+-----------+---------------+
1217
 *| uid|localIp| PId | timestamp | serial number |
1218
 *+------------+-----+-----------+---------------+
1219
 *| 12bit      |12bit|24bit      |16bit          |
1220
 *+------------+-----+-----------+---------------+
1221
 * @return
1222
 */
1223
uint64_t generateRequestId() {
15,783,069✔
1224
  static uint32_t hashId = 0;
1225
  static int32_t  requestSerialId = 0;
1226

1227
  if (hashId == 0) {
15,783,069✔
1228
    int32_t code = taosGetSystemUUIDU32(&hashId);
16,570✔
1229
    if (code != TSDB_CODE_SUCCESS) {
16,570!
1230
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
×
1231
               tstrerror(code));
1232
    }
1233
  }
1234

1235
  uint64_t id = 0;
15,784,144✔
1236

1237
  while (true) {
×
1238
    int64_t  ts = taosGetTimestampMs();
15,778,457✔
1239
    uint64_t pid = taosGetPId();
15,778,457✔
1240
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
15,778,525✔
1241
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
15,793,957✔
1242

1243
    id = (((uint64_t)(hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
15,795,346✔
1244
    if (id) {
15,795,346!
1245
      break;
15,795,346✔
1246
    }
1247
  }
1248
  return id;
15,795,346✔
1249
}
1250

1251
#if 0
1252
#include "cJSON.h"
1253
static setConfRet taos_set_config_imp(const char *config){
1254
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
1255
  static bool setConfFlag = false;
1256
  if (setConfFlag) {
1257
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
1258
    tstrncpy(ret.retMsg, "configuration can only set once", RET_MSG_LENGTH);
1259
    return ret;
1260
  }
1261
  taosInitGlobalCfg();
1262
  cJSON *root = cJSON_Parse(config);
1263
  if (root == NULL){
1264
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
1265
    tstrncpy(ret.retMsg, "parse json error", RET_MSG_LENGTH);
1266
    return ret;
1267
  }
1268

1269
  int size = cJSON_GetArraySize(root);
1270
  if(!cJSON_IsObject(root) || size == 0) {
1271
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
1272
    tstrncpy(ret.retMsg, "json content is invalid, must be not empty object", RET_MSG_LENGTH);
1273
    return ret;
1274
  }
1275

1276
  if(size >= 1000) {
1277
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
1278
    tstrncpy(ret.retMsg, "json object size is too long", RET_MSG_LENGTH);
1279
    return ret;
1280
  }
1281

1282
  for(int i = 0; i < size; i++){
1283
    cJSON *item = cJSON_GetArrayItem(root, i);
1284
    if(!item) {
1285
      ret.retCode = SET_CONF_RET_ERR_INNER;
1286
      tstrncpy(ret.retMsg, "inner error", RET_MSG_LENGTH);
1287
      return ret;
1288
    }
1289
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
1290
      ret.retCode = SET_CONF_RET_ERR_PART;
1291
      if (strlen(ret.retMsg) == 0){
1292
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
1293
      }else{
1294
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1295
        size_t leftSize = tmp >= 0 ? tmp : 0;
1296
        strncat(ret.retMsg, "|",  leftSize);
1297
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1298
        leftSize = tmp >= 0 ? tmp : 0;
1299
        strncat(ret.retMsg, item->string, leftSize);
1300
      }
1301
    }
1302
  }
1303
  cJSON_Delete(root);
1304
  setConfFlag = true;
1305
  return ret;
1306
}
1307

1308
setConfRet taos_set_config(const char *config){
1309
  taosThreadMutexLock(&setConfMutex);
1310
  setConfRet ret = taos_set_config_imp(config);
1311
  taosThreadMutexUnlock(&setConfMutex);
1312
  return ret;
1313
}
1314
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc