• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4747

21 Sep 2025 11:53PM UTC coverage: 58.002% (-1.1%) from 59.065%
#4747

push

travis-ci

web-flow
fix: refine python taos error log matching in checkAsan.sh (#33029)

* fix: refine python taos error log matching in checkAsan.sh

* fix: improve python taos error log matching in checkAsan.sh

133398 of 293157 branches covered (45.5%)

Branch coverage included in aggregate %.

201778 of 284713 relevant lines covered (70.87%)

5539418.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.25
/source/client/src/clientEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ttimer.h>
17
#include "cJSON.h"
18
#include "catalog.h"
19
#include "clientInt.h"
20
#include "clientLog.h"
21
#include "clientMonitor.h"
22
#include "functionMgt.h"
23
#include "os.h"
24
#include "osSleep.h"
25
#include "query.h"
26
#include "qworker.h"
27
#include "scheduler.h"
28
#include "tcache.h"
29
#include "tcompare.h"
30
#include "tconv.h"
31
#include "tglobal.h"
32
#include "thttp.h"
33
#include "tmsg.h"
34
#include "tqueue.h"
35
#include "tref.h"
36
#include "trpc.h"
37
#include "tsched.h"
38
#include "ttime.h"
39
#include "tversion.h"
40

41
#include "cus_name.h"
42

43
#define TSC_VAR_NOT_RELEASE 1
44
#define TSC_VAR_RELEASED    0
45

46
#define ENV_JSON_FALSE_CHECK(c)                     \
47
  do {                                              \
48
    if (!c) {                                       \
49
      tscError("faild to add item to JSON object"); \
50
      code = TSDB_CODE_TSC_FAIL_GENERATE_JSON;      \
51
      goto _end;                                    \
52
    }                                               \
53
  } while (0)
54

55
#define ENV_ERR_RET(c, info)          \
56
  do {                                \
57
    int32_t _code = (c);              \
58
    if (_code != TSDB_CODE_SUCCESS) { \
59
      terrno = _code;                 \
60
      tscInitRes = _code;             \
61
      tscError(info);                 \
62
      return;                         \
63
    }                                 \
64
  } while (0)
65

66
STscDbg   tscDbg = {0};
67
SAppInfo  appInfo;
68
int64_t   lastClusterId = 0;
69
int32_t   clientReqRefPool = -1;
70
int32_t   clientConnRefPool = -1;
71
int32_t   clientStop = -1;
72
SHashObj *pTimezoneMap = NULL;
73

74
int32_t timestampDeltaLimit = 900;  // s
75

76
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
77
volatile int32_t    tscInitRes = 0;
78

79
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
148,989✔
80
  int32_t code = TSDB_CODE_SUCCESS;
148,989✔
81
  // connection has been released already, abort creating request.
82
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
148,989✔
83
  if (pRequest->self < 0) {
149,081!
84
    tscError("failed to add ref to request");
×
85
    code = terrno;
×
86
    return code;
×
87
  }
88

89
  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
149,081✔
90

91
  if (pTscObj->pAppInfo) {
149,073!
92
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
149,077✔
93

94
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
149,077✔
95
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
149,078✔
96
    tscDebug("req:0x%" PRIx64 ", create request from conn:0x%" PRIx64
149,076✔
97
             ", current:%d, app current:%d, total:%d, QID:0x%" PRIx64,
98
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
99
  }
100

101
  return code;
149,078✔
102
}
103

104
static void concatStrings(SArray *list, char *buf, int size) {
×
105
  int len = 0;
×
106
  for (int i = 0; i < taosArrayGetSize(list); i++) {
×
107
    char *db = taosArrayGet(list, i);
×
108
    if (NULL == db) {
×
109
      tscError("get dbname failed, buf:%s", buf);
×
110
      break;
×
111
    }
112
    char *dot = strchr(db, '.');
×
113
    if (dot != NULL) {
×
114
      db = dot + 1;
×
115
    }
116
    if (i != 0) {
×
117
      (void)strncat(buf, ",", size - 1 - len);
×
118
      len += 1;
×
119
    }
120
    int ret = tsnprintf(buf + len, size - len, "%s", db);
×
121
    if (ret < 0) {
×
122
      tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
×
123
      break;
×
124
    }
125
    len += ret;
×
126
    if (len >= size) {
×
127
      tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
×
128
      break;
×
129
    }
130
  }
131
}
×
132
#ifdef USE_REPORT
133
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration) {
×
134
  cJSON  *json = cJSON_CreateObject();
×
135
  int32_t code = TSDB_CODE_SUCCESS;
×
136
  if (json == NULL) {
×
137
    tscError("failed to create monitor json");
×
138
    return TSDB_CODE_OUT_OF_MEMORY;
×
139
  }
140
  char clusterId[32] = {0};
×
141
  if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0) {
×
142
    tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
×
143
    code = TSDB_CODE_FAILED;
×
144
    goto _end;
×
145
  }
146

147
  char startTs[32] = {0};
×
148
  if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start / 1000) < 0) {
×
149
    tscError("failed to generate startTs:%" PRId64, pRequest->metric.start / 1000);
×
150
    code = TSDB_CODE_FAILED;
×
151
    goto _end;
×
152
  }
153

154
  char requestId[32] = {0};
×
155
  if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0) {
×
156
    tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
×
157
    code = TSDB_CODE_FAILED;
×
158
    goto _end;
×
159
  }
160
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
×
161
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
×
162
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
×
163
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration / 1000)));
×
164
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
×
165
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
×
166
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
×
167
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
×
168
      json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
169
  if (pRequest->sqlstr != NULL &&
×
170
      strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
×
171
    char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
×
172
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
×
173
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
174
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
×
175
  } else {
176
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
177
  }
178

179
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
×
180
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
×
181
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
×
182

183
  char pid[32] = {0};
×
184
  if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0) {
×
185
    tscError("failed to generate pid:%d", appInfo.pid);
×
186
    code = TSDB_CODE_FAILED;
×
187
    goto _end;
×
188
  }
189

190
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
×
191
  if (pRequest->dbList != NULL) {
×
192
    char dbList[1024] = {0};
×
193
    concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
×
194
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
×
195
  } else if (pRequest->pDb != NULL) {
×
196
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
×
197
  } else {
198
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
×
199
  }
200

201
  char *value = cJSON_PrintUnformatted(json);
×
202
  if (value == NULL) {
×
203
    tscError("failed to print json");
×
204
    code = TSDB_CODE_FAILED;
×
205
    goto _end;
×
206
  }
207
  MonitorSlowLogData data = {0};
×
208
  data.clusterId = pTscObj->pAppInfo->clusterId;
×
209
  data.type = SLOW_LOG_WRITE;
×
210
  data.data = value;
×
211
  code = monitorPutData2MonitorQueue(data);
×
212
  if (TSDB_CODE_SUCCESS != code) {
×
213
    taosMemoryFree(value);
×
214
    goto _end;
×
215
  }
216

217
_end:
×
218
  cJSON_Delete(json);
×
219
  return code;
×
220
}
221
#endif
222
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char *exceptDb) {
167✔
223
  if (pRequest->pDb != NULL) {
167✔
224
    return strcmp(pRequest->pDb, exceptDb) != 0;
116✔
225
  }
226

227
  for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
51!
228
    char *db = taosArrayGet(pRequest->dbList, i);
×
229
    if (NULL == db) {
×
230
      tscError("get dbname failed, exceptDb:%s", exceptDb);
×
231
      return false;
×
232
    }
233
    char *dot = strchr(db, '.');
×
234
    if (dot != NULL) {
×
235
      db = dot + 1;
×
236
    }
237
    if (strcmp(db, exceptDb) == 0) {
×
238
      return false;
×
239
    }
240
  }
241
  return true;
51✔
242
}
243

244
static void deregisterRequest(SRequestObj *pRequest) {
148,603✔
245
  if (pRequest == NULL) {
148,603!
246
    tscError("pRequest == NULL");
×
247
    return;
×
248
  }
249

250
  STscObj            *pTscObj = pRequest->pTscObj;
148,603✔
251
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
148,603✔
252

253
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
148,603✔
254
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
148,668✔
255
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
148,652✔
256

257
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
148,599✔
258
  tscDebug("req:0x%" PRIx64 ", free from conn:0x%" PRIx64 ", QID:0x%" PRIx64
148,599✔
259
           ", elapsed:%.2f ms, current:%d, app current:%d",
260
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
261

262
  if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
148,599!
263
    if ((pRequest->pQuery && pRequest->pQuery->pRoot && QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
148,653!
264
         (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
99,361✔
265
        QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
77,481✔
266
      tscDebug("req:0x%" PRIx64 ", insert duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
110,566✔
267
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
268
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
269
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
270
               pRequest->requestId);
271
      (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
110,566✔
272
      reqType = SLOW_LOG_TYPE_INSERT;
110,602✔
273
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
38,087✔
274
      tscDebug("req:0x%" PRIx64 ", query duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
19,441✔
275
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
276
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
277
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
278
               pRequest->requestId);
279

280
      (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
19,441✔
281
      reqType = SLOW_LOG_TYPE_QUERY;
19,447✔
282
    }
283

284
    if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
148,695!
285
      tscError("failed to release allocator");
×
286
    }
287
  }
288

289
#ifdef USE_REPORT
290
  if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
148,645✔
291
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
1,706!
292
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
705✔
293
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
1,001✔
294
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
331✔
295
    } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
670!
296
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
×
297
    }
298
  }
299

300
  if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
148,812!
301
      checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
167✔
302
    (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
167✔
303
    if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
167!
304
      taosPrintSlowLog("PID:%d, connId:%u, QID:0x%" PRIx64 ", Start:%" PRId64 "us, Duration:%" PRId64 "us, SQL:%s",
×
305
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
306
                       pRequest->sqlstr);
307
      if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
×
308
        slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
×
309
        if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
×
310
          tscError("failed to generate write slow log");
×
311
        }
312
      }
313
    }
314
  }
315
#endif
316

317
  releaseTscObj(pTscObj->id);
148,645✔
318
}
319

320
// todo close the transporter properly
321
void closeTransporter(SAppInstInfo *pAppInfo) {
2,072✔
322
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
2,072!
323
    return;
×
324
  }
325

326
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
2,072✔
327
  rpcClose(pAppInfo->pTransporter);
2,072✔
328
}
329

330
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
5,236✔
331
  if (NEED_REDIRECT_ERROR(code)) {
5,236!
332
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
1,370!
333
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK ||
1,370!
334
        msgType == TDMT_SCH_TASK_NOTIFY) {
335
      return false;
×
336
    }
337
    return true;
1,370✔
338
  } else if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY || code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE ||
3,866!
339
             code == TSDB_CODE_SYN_WRITE_STALL || code == TSDB_CODE_SYN_PROPOSE_NOT_READY ||
3,866!
340
             code == TSDB_CODE_SYN_RESTORING) {
341
    tscDebug("client msg type %s should retry since %s", TMSG_INFO(msgType), tstrerror(code));
1!
342
    return true;
×
343
  } else {
344
    return false;
3,865✔
345
  }
346
}
347

348
// start timer for particular msgType
349
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
×
350
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
×
351
    return true;
×
352
  }
353
  return false;
×
354
}
355

356
// TODO refactor
357
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
2,072✔
358
  SRpcInit rpcInit;
359
  (void)memset(&rpcInit, 0, sizeof(rpcInit));
2,072✔
360
  rpcInit.localPort = 0;
2,072✔
361
  rpcInit.label = "TSC";
2,072✔
362
  rpcInit.numOfThreads = tsNumOfRpcThreads;
2,072✔
363
  rpcInit.cfp = processMsgFromServer;
2,072✔
364
  rpcInit.rfp = clientRpcRfp;
2,072✔
365
  rpcInit.sessions = 1024;
2,072✔
366
  rpcInit.connType = TAOS_CONN_CLIENT;
2,072✔
367
  rpcInit.user = (char *)user;
2,072✔
368
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,072✔
369
  rpcInit.compressSize = tsCompressMsgSize;
2,072✔
370
  rpcInit.dfp = destroyAhandle;
2,072✔
371

372
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,072✔
373
  rpcInit.retryStepFactor = tsRedirectFactor;
2,072✔
374
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,072✔
375
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,072✔
376

377
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,072✔
378
  connLimitNum = TMAX(connLimitNum, 10);
2,072✔
379
  connLimitNum = TMIN(connLimitNum, 1000);
2,072✔
380
  rpcInit.connLimitNum = connLimitNum;
2,072✔
381
  rpcInit.shareConnLimit = tsShareConnLimit;
2,072✔
382
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,072✔
383
  rpcInit.startReadTimer = 1;
2,072✔
384
  rpcInit.readTimeout = tsReadTimeout;
2,072✔
385
  rpcInit.ipv6 = tsEnableIpv6;
2,072✔
386

387
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
2,072✔
388
  if (TSDB_CODE_SUCCESS != code) {
2,072!
389
    tscError("invalid version string.");
×
390
    return code;
×
391
  }
392

393
  tscInfo("rpc max retry timeout %" PRId64 "", rpcInit.retryMaxTimeout);
2,072!
394
  *pDnodeConn = rpcOpen(&rpcInit);
2,072✔
395
  if (*pDnodeConn == NULL) {
2,072!
396
    tscError("failed to init connection to server since %s", tstrerror(terrno));
×
397
    code = terrno;
×
398
  }
399

400
  return code;
2,072✔
401
}
402

403
void destroyAllRequests(SHashObj *pRequests) {
6,247✔
404
  void *pIter = taosHashIterate(pRequests, NULL);
6,247✔
405
  while (pIter != NULL) {
6,247!
406
    int64_t *rid = pIter;
×
407

408
    SRequestObj *pRequest = acquireRequest(*rid);
×
409
    if (pRequest) {
×
410
      destroyRequest(pRequest);
×
411
      (void)releaseRequest(*rid);  // ignore error
×
412
    }
413

414
    pIter = taosHashIterate(pRequests, pIter);
×
415
  }
416
}
6,247✔
417

418
void stopAllRequests(SHashObj *pRequests) {
5✔
419
  void *pIter = taosHashIterate(pRequests, NULL);
5✔
420
  while (pIter != NULL) {
5!
421
    int64_t *rid = pIter;
×
422

423
    SRequestObj *pRequest = acquireRequest(*rid);
×
424
    if (pRequest) {
×
425
      taos_stop_query(pRequest);
×
426
      (void)releaseRequest(*rid);  // ignore error
×
427
    }
428

429
    pIter = taosHashIterate(pRequests, pIter);
×
430
  }
431
}
5✔
432

433
void destroyAppInst(void *info) {
2,072✔
434
  SAppInstInfo *pAppInfo = *(SAppInstInfo **)info;
2,072✔
435
  tscInfo("destroy app inst mgr %p", pAppInfo);
2,072!
436

437
  int32_t code = taosThreadMutexLock(&appInfo.mutex);
2,072✔
438
  if (TSDB_CODE_SUCCESS != code) {
2,072!
439
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
440
  }
441

442
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
2,072✔
443

444
  code = taosThreadMutexUnlock(&appInfo.mutex);
2,072✔
445
  if (TSDB_CODE_SUCCESS != code) {
2,072!
446
    tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
447
  }
448

449
  taosMemoryFreeClear(pAppInfo->instKey);
2,072!
450
  closeTransporter(pAppInfo);
2,072✔
451

452
  code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
2,072✔
453
  if (TSDB_CODE_SUCCESS != code) {
2,072!
454
    tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
455
  }
456

457
  taosArrayDestroy(pAppInfo->pQnodeList);
2,072✔
458
  code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
2,072✔
459
  if (TSDB_CODE_SUCCESS != code) {
2,072!
460
    tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
461
  }
462

463
  taosMemoryFree(pAppInfo);
2,072!
464
}
2,072✔
465

466
void destroyTscObj(void *pObj) {
6,246✔
467
  if (NULL == pObj) {
6,246!
468
    return;
×
469
  }
470

471
  STscObj *pTscObj = pObj;
6,246✔
472
  int64_t  tscId = pTscObj->id;
6,246✔
473
  tscTrace("conn:%" PRIx64 ", begin destroy, p:%p", tscId, pTscObj);
6,246!
474

475
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
6,246✔
476
  hbDeregisterConn(pTscObj, connKey);
6,246✔
477

478
  destroyAllRequests(pTscObj->pRequests);
6,247✔
479
  taosHashCleanup(pTscObj->pRequests);
6,247✔
480

481
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
6,247✔
482
  tscDebug("conn:0x%" PRIx64 ", p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
6,247✔
483
           pTscObj->pAppInfo->numOfConns);
484

485
  // In any cases, we should not free app inst here. Or an race condition rises.
486
  /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
6,247✔
487

488
  (void)taosThreadMutexDestroy(&pTscObj->mutex);
6,247✔
489
  taosMemoryFree(pTscObj);
6,247!
490

491
  tscTrace("conn:0x%" PRIx64 ", end destroy, p:%p", tscId, pTscObj);
6,247!
492
}
493

494
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
7,008✔
495
                     STscObj **pObj) {
496
  *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
7,008!
497
  if (NULL == *pObj) {
7,008!
498
    return terrno;
×
499
  }
500

501
  (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
7,008✔
502
  if (NULL == (*pObj)->pRequests) {
7,008!
503
    taosMemoryFree(*pObj);
×
504
    return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
505
  }
506

507
  (*pObj)->connType = connType;
7,008✔
508
  (*pObj)->pAppInfo = pAppInfo;
7,008✔
509
  (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
7,008✔
510
  tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
7,008✔
511
  (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
7,008✔
512

513
  if (db != NULL) {
7,008!
514
    tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
7,008✔
515
  }
516

517
  TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
7,008!
518

519
  int32_t code = TSDB_CODE_SUCCESS;
7,008✔
520

521
  (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
7,008✔
522
  if ((*pObj)->id < 0) {
7,008!
523
    tscError("failed to add object to clientConnRefPool");
×
524
    code = terrno;
×
525
    taosMemoryFree(*pObj);
×
526
    return code;
×
527
  }
528

529
  (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
7,008✔
530

531
  tscInfo("conn:0x%" PRIx64 ", created, p:%p", (*pObj)->id, *pObj);
7,008!
532
  return code;
7,008✔
533
}
534

535
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
201,225✔
536

537
void releaseTscObj(int64_t rid) {
197,695✔
538
  int32_t code = taosReleaseRef(clientConnRefPool, rid);
197,695✔
539
  if (TSDB_CODE_SUCCESS != code) {
197,813!
540
    tscWarn("failed to release TscObj, code:%s", tstrerror(code));
×
541
  }
542
}
197,813✔
543

544
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
148,985✔
545
  int32_t code = TSDB_CODE_SUCCESS;
148,985✔
546
  *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
148,985!
547
  if (NULL == *pRequest) {
148,945!
548
    return terrno;
×
549
  }
550

551
  STscObj *pTscObj = acquireTscObj(connId);
148,945✔
552
  if (pTscObj == NULL) {
149,077!
553
    TSC_ERR_JRET(TSDB_CODE_TSC_DISCONNECTED);
×
554
  }
555
  SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
149,077!
556
  if (interParam == NULL) {
149,052!
557
    releaseTscObj(connId);
×
558
    TSC_ERR_JRET(terrno);
×
559
  }
560
  TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
149,052!
561
  interParam->pRequest = *pRequest;
149,057✔
562
  (*pRequest)->body.interParam = interParam;
149,057✔
563

564
  (*pRequest)->resType = RES_TYPE__QUERY;
149,057✔
565
  (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
149,057!
566
  (*pRequest)->metric.start = taosGetTimestampUs();
149,070✔
567

568
  (*pRequest)->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
149,072✔
569
  (*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt;
149,072✔
570
  (*pRequest)->type = type;
149,072✔
571
  (*pRequest)->allocatorRefId = -1;
149,072✔
572

573
  (*pRequest)->pDb = getDbOfConnection(pTscObj);
149,072✔
574
  if (NULL == (*pRequest)->pDb) {
149,031✔
575
    TSC_ERR_JRET(terrno);
15,623!
576
  }
577
  (*pRequest)->pTscObj = pTscObj;
149,031✔
578
  (*pRequest)->inCallback = false;
149,031✔
579
  (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
149,031!
580
  if (NULL == (*pRequest)->msgBuf) {
148,958!
581
    code = terrno;
×
582
    goto _return;
×
583
  }
584
  (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
148,958✔
585
  TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
148,958!
586
  TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
148,977!
587

588
  return TSDB_CODE_SUCCESS;
149,080✔
589
_return:
×
590
  if ((*pRequest)->pTscObj) {
×
591
    doDestroyRequest(*pRequest);
×
592
  } else {
593
    taosMemoryFree(*pRequest);
×
594
  }
595
  return code;
×
596
}
597

598
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
148,746✔
599
  taosMemoryFreeClear(pResInfo->pRspMsg);
148,746!
600
  taosMemoryFreeClear(pResInfo->length);
148,687!
601
  taosMemoryFreeClear(pResInfo->row);
148,703!
602
  taosMemoryFreeClear(pResInfo->pCol);
148,705!
603
  taosMemoryFreeClear(pResInfo->fields);
148,715!
604
  taosMemoryFreeClear(pResInfo->userFields);
148,698!
605
  taosMemoryFreeClear(pResInfo->convertJson);
148,711!
606
  taosMemoryFreeClear(pResInfo->decompBuf);
148,711!
607

608
  if (pResInfo->convertBuf != NULL) {
148,711✔
609
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
74,605✔
610
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
54,234!
611
    }
612
    taosMemoryFreeClear(pResInfo->convertBuf);
20,371!
613
  }
614
}
148,677✔
615

616
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
668,828✔
617

618
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
669,369✔
619

620
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
148,609✔
621

622
/// return the most previous req ref id
623
int64_t removeFromMostPrevReq(SRequestObj *pRequest) {
148,637✔
624
  int64_t      mostPrevReqRefId = pRequest->self;
148,637✔
625
  SRequestObj *pTmp = pRequest;
148,637✔
626
  while (pTmp->relation.prevRefId) {
148,637!
627
    pTmp = acquireRequest(pTmp->relation.prevRefId);
×
628
    if (pTmp) {
×
629
      mostPrevReqRefId = pTmp->self;
×
630
      (void)releaseRequest(mostPrevReqRefId);  // ignore error
×
631
    } else {
632
      break;
×
633
    }
634
  }
635
  (void)removeRequest(mostPrevReqRefId);  // ignore error
148,637✔
636
  return mostPrevReqRefId;
148,670✔
637
}
638

639
void destroyNextReq(int64_t nextRefId) {
148,604✔
640
  if (nextRefId) {
148,604!
641
    SRequestObj *pObj = acquireRequest(nextRefId);
×
642
    if (pObj) {
×
643
      (void)releaseRequest(nextRefId);  // ignore error
×
644
      (void)releaseRequest(nextRefId);  // ignore error
×
645
    }
646
  }
647
}
148,604✔
648

649
void destroySubRequests(SRequestObj *pRequest) {
×
650
  int32_t      reqIdx = -1;
×
651
  SRequestObj *pReqList[16] = {NULL};
×
652
  uint64_t     tmpRefId = 0;
×
653

654
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
×
655
    return;
×
656
  }
657

658
  SRequestObj *pTmp = pRequest;
×
659
  while (pTmp->relation.prevRefId) {
×
660
    tmpRefId = pTmp->relation.prevRefId;
×
661
    pTmp = acquireRequest(tmpRefId);
×
662
    if (pTmp) {
×
663
      pReqList[++reqIdx] = pTmp;
×
664
      (void)releaseRequest(tmpRefId);  // ignore error
×
665
    } else {
666
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
667
      break;
×
668
    }
669
  }
670

671
  for (int32_t i = reqIdx; i >= 0; i--) {
×
672
    (void)removeRequest(pReqList[i]->self);  // ignore error
×
673
  }
674

675
  tmpRefId = pRequest->relation.nextRefId;
×
676
  while (tmpRefId) {
×
677
    pTmp = acquireRequest(tmpRefId);
×
678
    if (pTmp) {
×
679
      tmpRefId = pTmp->relation.nextRefId;
×
680
      (void)removeRequest(pTmp->self);   // ignore error
×
681
      (void)releaseRequest(pTmp->self);  // ignore error
×
682
    } else {
683
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
684
      break;
×
685
    }
686
  }
687
}
688

689
void doDestroyRequest(void *p) {
148,644✔
690
  if (NULL == p) {
148,644!
691
    return;
×
692
  }
693

694
  SRequestObj *pRequest = (SRequestObj *)p;
148,644✔
695

696
  uint64_t reqId = pRequest->requestId;
148,644✔
697
  tscTrace("QID:0x%" PRIx64 ", begin destroy request, res:%p", reqId, pRequest);
148,644!
698

699
  int64_t nextReqRefId = pRequest->relation.nextRefId;
148,644✔
700

701
  int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
148,644✔
702
  if (TSDB_CODE_SUCCESS != code) {
148,687✔
703
    tscDebug("failed to remove request from hash since %s", tstrerror(code));
7,008✔
704
  }
705
  schedulerFreeJob(&pRequest->body.queryJob, 0);
148,687✔
706

707
  destorySqlCallbackWrapper(pRequest->pWrapper);
148,649✔
708

709
  taosMemoryFreeClear(pRequest->msgBuf);
148,630!
710

711
  doFreeReqResultInfo(&pRequest->body.resInfo);
148,570✔
712
  if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
148,575!
713
    tscError("failed to destroy semaphore");
×
714
  }
715

716
  taosArrayDestroy(pRequest->tableList);
148,635✔
717
  taosArrayDestroy(pRequest->targetTableList);
148,562✔
718
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
148,616✔
719

720
  if (pRequest->self) {
148,620✔
721
    deregisterRequest(pRequest);
148,616✔
722
  }
723

724
  taosMemoryFreeClear(pRequest->pDb);
148,668!
725
  taosArrayDestroy(pRequest->dbList);
148,552✔
726
  if (pRequest->body.interParam) {
148,620!
727
    if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
148,627!
728
      tscError("failed to destroy semaphore in pRequest");
×
729
    }
730
  }
731
  taosMemoryFree(pRequest->body.interParam);
148,636!
732

733
  qDestroyQuery(pRequest->pQuery);
148,566✔
734
  nodesDestroyAllocator(pRequest->allocatorRefId);
148,592✔
735

736
  taosMemoryFreeClear(pRequest->effectiveUser);
148,643!
737
  taosMemoryFreeClear(pRequest->sqlstr);
148,643!
738
  taosMemoryFree(pRequest);
148,634!
739
  tscTrace("QID:0x%" PRIx64 ", end destroy request, res:%p", reqId, pRequest);
148,618!
740
  destroyNextReq(nextReqRefId);
148,618✔
741
}
742

743
void destroyRequest(SRequestObj *pRequest) {
148,653✔
744
  if (pRequest == NULL) return;
148,653!
745

746
  taos_stop_query(pRequest);
148,653✔
747
  (void)removeFromMostPrevReq(pRequest);
148,667✔
748
}
749

750
void taosStopQueryImpl(SRequestObj *pRequest) {
148,653✔
751
  pRequest->killed = true;
148,653✔
752

753
  // It is not a query, no need to stop.
754
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
148,653✔
755
    tscDebug("QID:0x%" PRIx64 ", no need to be killed since not query", pRequest->requestId);
30,483✔
756
    return;
30,509✔
757
  }
758

759
  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
118,170✔
760
  tscDebug("QID:0x%" PRIx64 ", killed", pRequest->requestId);
118,153✔
761
}
762

763
void stopAllQueries(SRequestObj *pRequest) {
148,654✔
764
  int32_t      reqIdx = -1;
148,654✔
765
  SRequestObj *pReqList[16] = {NULL};
148,654✔
766
  uint64_t     tmpRefId = 0;
148,654✔
767

768
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
148,654!
769
    return;
×
770
  }
771

772
  SRequestObj *pTmp = pRequest;
148,654✔
773
  while (pTmp->relation.prevRefId) {
148,654!
774
    tmpRefId = pTmp->relation.prevRefId;
×
775
    pTmp = acquireRequest(tmpRefId);
×
776
    if (pTmp) {
×
777
      pReqList[++reqIdx] = pTmp;
×
778
    } else {
779
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
780
      break;
×
781
    }
782
  }
783

784
  for (int32_t i = reqIdx; i >= 0; i--) {
148,659!
785
    taosStopQueryImpl(pReqList[i]);
×
786
    (void)releaseRequest(pReqList[i]->self);  // ignore error
×
787
  }
788

789
  taosStopQueryImpl(pRequest);
148,659✔
790

791
  tmpRefId = pRequest->relation.nextRefId;
148,663✔
792
  while (tmpRefId) {
148,667!
793
    pTmp = acquireRequest(tmpRefId);
×
794
    if (pTmp) {
×
795
      tmpRefId = pTmp->relation.nextRefId;
×
796
      taosStopQueryImpl(pTmp);
×
797
      (void)releaseRequest(pTmp->self);  // ignore error
×
798
    } else {
799
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
800
      break;
×
801
    }
802
  }
803
}
804
#ifdef USE_REPORT
805
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
×
806

807
static void *tscCrashReportThreadFp(void *param) {
×
808
  int32_t code = 0;
×
809
  setThreadName("client-crashReport");
×
810
  char filepath[PATH_MAX] = {0};
×
811
  (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
×
812
  char     *pMsg = NULL;
×
813
  int64_t   msgLen = 0;
×
814
  TdFilePtr pFile = NULL;
×
815
  bool      truncateFile = false;
×
816
  int32_t   sleepTime = 200;
×
817
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
818
  int32_t   loopTimes = reportPeriodNum;
×
819

820
#ifdef WINDOWS
821
  if (taosCheckCurrentInDll()) {
822
    atexit(crashReportThreadFuncUnexpectedStopped);
823
  }
824
#endif
825

826
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
×
827
    return NULL;
×
828
  }
829
  STelemAddrMgmt mgt;
830
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
831
  if (code) {
×
832
    tscError("failed to init telemetry management, code:%s", tstrerror(code));
×
833
    return NULL;
×
834
  }
835

836
  code = initCrashLogWriter();
×
837
  if (code) {
×
838
    tscError("failed to init crash log writer, code:%s", tstrerror(code));
×
839
    return NULL;
×
840
  }
841

842
  while (1) {
843
    checkAndPrepareCrashInfo();
×
844
    if (clientStop > 0 && reportThreadSetQuit()) break;
×
845
    if (loopTimes++ < reportPeriodNum) {
×
846
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
847
      taosMsleep(sleepTime);
×
848
      continue;
×
849
    }
850

851
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
852
    if (pMsg && msgLen > 0) {
×
853
      if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
854
        tscError("failed to send crash report");
×
855
        if (pFile) {
×
856
          taosReleaseCrashLogFile(pFile, false);
×
857
          pFile = NULL;
×
858

859
          taosMsleep(sleepTime);
×
860
          loopTimes = 0;
×
861
          continue;
×
862
        }
863
      } else {
864
        tscInfo("succeed to send crash report");
×
865
        truncateFile = true;
×
866
      }
867
    } else {
868
      tscInfo("no crash info was found");
×
869
    }
870

871
    taosMemoryFree(pMsg);
×
872

873
    if (pMsg && msgLen > 0) {
×
874
      pMsg = NULL;
×
875
      continue;
×
876
    }
877

878
    if (pFile) {
×
879
      taosReleaseCrashLogFile(pFile, truncateFile);
×
880
      pFile = NULL;
×
881
      truncateFile = false;
×
882
    }
883

884
    taosMsleep(sleepTime);
×
885
    loopTimes = 0;
×
886
  }
887
  taosTelemetryDestroy(&mgt);
×
888

889
  clientStop = -2;
×
890
  return NULL;
×
891
}
892

893
int32_t tscCrashReportInit() {
2,036✔
894
  if (!tsEnableCrashReport) {
2,036!
895
    return TSDB_CODE_SUCCESS;
2,036✔
896
  }
897
  int32_t      code = TSDB_CODE_SUCCESS;
×
898
  TdThreadAttr thAttr;
899
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
×
900
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
×
901
  TdThread crashReportThread;
902
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
×
903
    tscError("failed to create crashReport thread since %s", strerror(ERRNO));
×
904
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
905
    TSC_ERR_RET(terrno);
×
906
  }
907

908
  (void)taosThreadAttrDestroy(&thAttr);
×
909
_return:
×
910
  if (code) {
×
911
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
912
    TSC_ERR_RET(terrno);
×
913
  }
914

915
  return code;
×
916
}
917

918
void tscStopCrashReport() {
2,037✔
919
  if (!tsEnableCrashReport) {
2,037!
920
    return;
2,037✔
921
  }
922

923
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
×
924
    tscDebug("crash report thread already stopped");
×
925
    return;
×
926
  }
927

928
  while (atomic_load_32(&clientStop) > 0) {
×
929
    taosMsleep(100);
×
930
  }
931
}
932

933
void taos_write_crashinfo(int signum, void *sigInfo, void *context) {
×
934
  writeCrashLogToFile(signum, sigInfo, CUS_PROMPT, lastClusterId, appInfo.startTime);
×
935
}
×
936
#endif
937

938
#ifdef TAOSD_INTEGRATED
939
typedef struct {
940
  TdThread pid;
941
  int32_t  stat;  // < 0: start failed, 0: init(not start), 1: start successfully
942
} SDaemonObj;
943

944
extern int  dmStartDaemon(int argc, char const *argv[]);
945
extern void dmStopDaemon();
946

947
SDaemonObj daemonObj = {0};
948

949
typedef struct {
950
  int32_t argc;
951
  char  **argv;
952
} SExecArgs;
953

954
static void *dmStartDaemonFunc(void *param) {
955
  int32_t    code = 0;
956
  SExecArgs *pArgs = (SExecArgs *)param;
957
  int32_t    argc = pArgs->argc;
958
  char     **argv = pArgs->argv;
959

960
  code = dmStartDaemon(argc, (const char **)argv);
961
  if (code != 0) {
962
    printf("failed to start taosd since %s\r\n", tstrerror(code));
963
    goto _exit;
964
  }
965

966
_exit:
967
  if (code != 0) {
968
    atomic_store_32(&daemonObj.stat, code);
969
  }
970
  return NULL;
971
}
972

973
static int32_t shellStartDaemon(int argc, char *argv[]) {
974
  int32_t    code = 0, lino = 0;
975
  SExecArgs *pArgs = NULL;
976
  int64_t    startMs = taosGetTimestampMs(), endMs = startMs;
977

978
  TdThreadAttr thAttr;
979
  (void)taosThreadAttrInit(&thAttr);
980
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
981
#ifdef TD_COMPACT_OS
982
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
983
#endif
984
  pArgs = (SExecArgs *)taosMemoryCalloc(1, sizeof(SExecArgs));
985
  if (pArgs == NULL) {
986
    code = terrno;
987
    TAOS_CHECK_EXIT(code);
988
  }
989
  pArgs->argc = argc;
990
  pArgs->argv = argv;
991

992
#ifndef TD_AS_LIB
993
  tsLogEmbedded = 1;
994
#endif
995

996
  TAOS_CHECK_EXIT(taosThreadCreate(&daemonObj.pid, &thAttr, dmStartDaemonFunc, pArgs));
997

998
  while (true) {
999
    if (atomic_load_64(&tsDndStart)) {
1000
      atomic_store_32(&daemonObj.stat, 1);
1001
      break;
1002
    }
1003
    int32_t daemonstat = atomic_load_32(&daemonObj.stat);
1004
    if (daemonstat < 0) {
1005
      code = daemonstat;
1006
      TAOS_CHECK_EXIT(code);
1007
    }
1008

1009
    if (daemonstat > 1) {
1010
      code = TSDB_CODE_APP_ERROR;
1011
      TAOS_CHECK_EXIT(code);
1012
    }
1013
    taosMsleep(1000);
1014
  }
1015

1016
_exit:
1017
  endMs = taosGetTimestampMs();
1018
  (void)taosThreadAttrDestroy(&thAttr);
1019
  taosMemoryFreeClear(pArgs);
1020
  if (code) {
1021
    printf("\r\n The daemon start failed at line %d since %s, cost %" PRIi64 " ms\r\n", lino, tstrerror(code),
1022
           endMs - startMs);
1023
  } else {
1024
    printf("\r\n The daemon started successfully, cost %" PRIi64 " ms\r\n", endMs - startMs);
1025
  }
1026
#ifndef TD_AS_LIB
1027
  tsLogEmbedded = 0;
1028
#endif
1029
  return code;
1030
}
1031

1032
void shellStopDaemon() {
1033
#ifndef TD_AS_LIB
1034
  tsLogEmbedded = 1;
1035
#endif
1036
  dmStopDaemon();
1037
  if (taosCheckPthreadValid(daemonObj.pid)) {
1038
    (void)taosThreadJoin(daemonObj.pid, NULL);
1039
    taosThreadClear(&daemonObj.pid);
1040
  }
1041
}
1042
#endif
1043

1044
void taos_init_imp(void) {
2,037✔
1045
#if defined(LINUX)
1046
  if (tscDbg.memEnable) {
2,037!
1047
    int32_t code = taosMemoryDbgInit();
×
1048
    if (code) {
×
1049
      (void)printf("failed to init memory dbg, error:%s\n", tstrerror(code));
×
1050
    } else {
1051
      tsAsyncLog = false;
×
1052
      (void)printf("memory dbg enabled\n");
×
1053
    }
1054
  }
1055
#endif
1056

1057
  // In the APIs of other program language, taos_cleanup is not available yet.
1058
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
1059
  (void)atexit(taos_cleanup);
2,037✔
1060
  SET_ERRNO(TSDB_CODE_SUCCESS);
2,037✔
1061
  terrno = TSDB_CODE_SUCCESS;
2,037✔
1062
  taosSeedRand(taosGetTimestampSec());
2,037✔
1063

1064
  appInfo.pid = taosGetPId();
2,037✔
1065
  appInfo.startTime = taosGetTimestampMs();
2,037✔
1066
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
2,037✔
1067
  appInfo.pInstMapByClusterId =
2,037✔
1068
      taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
2,037✔
1069
  if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
2,037!
1070
    (void)printf("failed to allocate memory when init appInfo\n");
×
1071
    tscInitRes = terrno;
×
1072
    return;
1✔
1073
  }
1074
  taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
2,037✔
1075

1076
  const char *logName = CUS_PROMPT "log";
2,037✔
1077
  ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
2,037!
1078
  if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
2,037✔
1079
    (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(ERRNO), configDir);
1✔
1080
    tscInitRes = terrno;
1✔
1081
    return;
1✔
1082
  }
1083

1084
#ifdef TAOSD_INTEGRATED
1085
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0), "failed to init cfg");
1086
#else
1087
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
2,036!
1088
#endif
1089

1090
  initQueryModuleMsgHandle();
2,036✔
1091
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
1092
  if ((tsCharsetCxt = taosConvInit(tsCharset)) == NULL) {
2,036!
1093
    tscInitRes = terrno;
×
1094
    tscError("failed to init conv");
×
1095
    return;
×
1096
  }
1097
#endif
1098
#if !defined(WINDOWS) && !defined(TD_ASTRA)
1099
  ENV_ERR_RET(tzInit(), "failed to init timezone");
2,036!
1100
#endif
1101
  ENV_ERR_RET(monitorInit(), "failed to init monitor");
2,036!
1102
  ENV_ERR_RET(rpcInit(), "failed to init rpc");
2,036!
1103

1104
  if (InitRegexCache() != 0) {
2,036!
1105
    tscInitRes = terrno;
×
1106
    (void)printf("failed to init regex cache\n");
×
1107
    return;
×
1108
  }
1109

1110
  tscInfo("starting to initialize TAOS driver");
2,036!
1111

1112
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
2,036✔
1113
  ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
2,036!
1114
  ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
2,036!
1115
  ENV_ERR_RET(initClientId(), "failed to init clientId");
2,036!
1116

1117
  ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
2,036!
1118
  ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
2,036!
1119
  ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
2,036!
1120

1121
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
2,036✔
1122
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
2,036✔
1123

1124
  ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
2,036!
1125
  ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
2,036!
1126
#ifdef USE_REPORT
1127
  ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
2,036!
1128
#endif
1129
  ENV_ERR_RET(qInitKeywordsTable(), "failed to init parser keywords table");
2,036!
1130
#ifdef TAOSD_INTEGRATED
1131
  ENV_ERR_RET(shellStartDaemon(0, NULL), "failed to start taosd daemon");
1132
#endif
1133
  tscInfo("TAOS driver is initialized successfully");
2,036!
1134
}
1135

1136
int taos_init() {
2,870✔
1137
  (void)taosThreadOnce(&tscinit, taos_init_imp);
2,870✔
1138
  return tscInitRes;
2,870✔
1139
}
1140

1141
const char *getCfgName(TSDB_OPTION option) {
25✔
1142
  const char *name = NULL;
25✔
1143

1144
  switch (option) {
25!
1145
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
×
1146
      name = "shellActivityTimer";
×
1147
      break;
×
1148
    case TSDB_OPTION_LOCALE:
×
1149
      name = "locale";
×
1150
      break;
×
1151
    case TSDB_OPTION_CHARSET:
×
1152
      name = "charset";
×
1153
      break;
×
1154
    case TSDB_OPTION_TIMEZONE:
16✔
1155
      name = "timezone";
16✔
1156
      break;
16✔
1157
    case TSDB_OPTION_USE_ADAPTER:
×
1158
      name = "useAdapter";
×
1159
      break;
×
1160
    default:
9✔
1161
      break;
9✔
1162
  }
1163

1164
  return name;
25✔
1165
}
1166

1167
int taos_options_imp(TSDB_OPTION option, const char *str) {
495✔
1168
  if (option == TSDB_OPTION_CONFIGDIR) {
495✔
1169
#ifndef WINDOWS
1170
    char newstr[PATH_MAX];
1171
    int  len = strlen(str);
470✔
1172
    if (len > 1 && str[0] != '"' && str[0] != '\'') {
470!
1173
      if (len + 2 >= PATH_MAX) {
470!
1174
        tscError("Too long path %s", str);
×
1175
        return -1;
×
1176
      }
1177
      newstr[0] = '"';
470✔
1178
      (void)memcpy(newstr + 1, str, len);
470✔
1179
      newstr[len + 1] = '"';
470✔
1180
      newstr[len + 2] = '\0';
470✔
1181
      str = newstr;
470✔
1182
    }
1183
#endif
1184
    tstrncpy(configDir, str, PATH_MAX);
470✔
1185
    tscInfo("set cfg:%s to %s", configDir, str);
470!
1186
    return 0;
470✔
1187
  }
1188

1189
  // initialize global config
1190
  if (taos_init() != 0) {
25!
1191
    return -1;
×
1192
  }
1193

1194
  SConfig     *pCfg = taosGetCfg();
25✔
1195
  SConfigItem *pItem = NULL;
25✔
1196
  const char  *name = getCfgName(option);
25✔
1197

1198
  if (name == NULL) {
25✔
1199
    tscError("Invalid option %d", option);
9!
1200
    return -1;
9✔
1201
  }
1202

1203
  pItem = cfgGetItem(pCfg, name);
16✔
1204
  if (pItem == NULL) {
16!
1205
    tscError("Invalid option %d", option);
×
1206
    return -1;
×
1207
  }
1208

1209
  int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
16✔
1210
  if (code != 0) {
16!
1211
    tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
×
1212
  } else {
1213
    tscInfo("set cfg:%s to %s", name, str);
16!
1214
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
16!
1215
      code = taosCfgDynamicOptions(pCfg, name, false);
×
1216
    }
1217
  }
1218

1219
  return code;
16✔
1220
}
1221

1222
/**
1223
 * The request id is an unsigned integer format of 64bit.
1224
 *+------------+-----+-----------+---------------+
1225
 *| uid|localIp| PId | timestamp | serial number |
1226
 *+------------+-----+-----------+---------------+
1227
 *| 12bit      |12bit|24bit      |16bit          |
1228
 *+------------+-----+-----------+---------------+
1229
 * @return
1230
 */
1231
uint64_t generateRequestId() {
252,140✔
1232
  static uint32_t hashId = 0;
1233
  static int32_t  requestSerialId = 0;
1234

1235
  if (hashId == 0) {
252,140✔
1236
    int32_t code = taosGetSystemUUIDU32(&hashId);
2,020✔
1237
    if (code != TSDB_CODE_SUCCESS) {
2,020!
1238
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
×
1239
               tstrerror(code));
1240
    }
1241
  }
1242

1243
  uint64_t id = 0;
252,145✔
1244

1245
  while (true) {
×
1246
    int64_t  ts = taosGetTimestampMs();
252,140✔
1247
    uint64_t pid = taosGetPId();
252,140✔
1248
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
252,153✔
1249
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
252,198!
1250

1251
    id = (((uint64_t)(hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
252,198✔
1252
    if (id) {
252,198!
1253
      break;
252,198✔
1254
    }
1255
  }
1256
  return id;
252,198✔
1257
}
1258

1259
#if 0
1260
#include "cJSON.h"
1261
static setConfRet taos_set_config_imp(const char *config){
1262
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
1263
  static bool setConfFlag = false;
1264
  if (setConfFlag) {
1265
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
1266
    tstrncpy(ret.retMsg, "configuration can only set once", RET_MSG_LENGTH);
1267
    return ret;
1268
  }
1269
  taosInitGlobalCfg();
1270
  cJSON *root = cJSON_Parse(config);
1271
  if (root == NULL){
1272
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
1273
    tstrncpy(ret.retMsg, "parse json error", RET_MSG_LENGTH);
1274
    return ret;
1275
  }
1276

1277
  int size = cJSON_GetArraySize(root);
1278
  if(!cJSON_IsObject(root) || size == 0) {
1279
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
1280
    tstrncpy(ret.retMsg, "json content is invalid, must be not empty object", RET_MSG_LENGTH);
1281
    return ret;
1282
  }
1283

1284
  if(size >= 1000) {
1285
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
1286
    tstrncpy(ret.retMsg, "json object size is too long", RET_MSG_LENGTH);
1287
    return ret;
1288
  }
1289

1290
  for(int i = 0; i < size; i++){
1291
    cJSON *item = cJSON_GetArrayItem(root, i);
1292
    if(!item) {
1293
      ret.retCode = SET_CONF_RET_ERR_INNER;
1294
      tstrncpy(ret.retMsg, "inner error", RET_MSG_LENGTH);
1295
      return ret;
1296
    }
1297
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
1298
      ret.retCode = SET_CONF_RET_ERR_PART;
1299
      if (strlen(ret.retMsg) == 0){
1300
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
1301
      }else{
1302
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1303
        size_t leftSize = tmp >= 0 ? tmp : 0;
1304
        strncat(ret.retMsg, "|",  leftSize);
1305
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1306
        leftSize = tmp >= 0 ? tmp : 0;
1307
        strncat(ret.retMsg, item->string, leftSize);
1308
      }
1309
    }
1310
  }
1311
  cJSON_Delete(root);
1312
  setConfFlag = true;
1313
  return ret;
1314
}
1315

1316
setConfRet taos_set_config(const char *config){
1317
  taosThreadMutexLock(&setConfMutex);
1318
  setConfRet ret = taos_set_config_imp(config);
1319
  taosThreadMutexUnlock(&setConfMutex);
1320
  return ret;
1321
}
1322
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc