• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/source/client/src/clientEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ttimer.h>
17
#include "cJSON.h"
18
#include "catalog.h"
19
#include "clientInt.h"
20
#include "clientLog.h"
21
#include "clientMonitor.h"
22
#include "functionMgt.h"
23
#include "os.h"
24
#include "osSleep.h"
25
#include "query.h"
26
#include "qworker.h"
27
#include "scheduler.h"
28
#include "tcache.h"
29
#include "tcompare.h"
30
#include "tglobal.h"
31
#include "thttp.h"
32
#include "tmsg.h"
33
#include "tqueue.h"
34
#include "tref.h"
35
#include "trpc.h"
36
#include "tsched.h"
37
#include "ttime.h"
38
#include "tversion.h"
39
#include "tconv.h"
40

41
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
42
#include "cus_name.h"
43
#endif
44

45
#ifndef CUS_PROMPT
46
#define CUS_PROMPT "tao"
47
#endif
48

49
#define TSC_VAR_NOT_RELEASE 1
50
#define TSC_VAR_RELEASED    0
51

52
#define ENV_JSON_FALSE_CHECK(c)                     \
53
  do {                                              \
54
    if (!c) {                                       \
55
      tscError("faild to add item to JSON object"); \
56
      code = TSDB_CODE_TSC_FAIL_GENERATE_JSON;      \
57
      goto _end;                                    \
58
    }                                               \
59
  } while (0)
60

61
#define ENV_ERR_RET(c, info)          \
62
  do {                                \
63
    int32_t _code = c;                \
64
    if (_code != TSDB_CODE_SUCCESS) { \
65
      errno = _code;                  \
66
      tscInitRes = _code;             \
67
      tscError(info);                 \
68
      return;                         \
69
    }                                 \
70
  } while (0)
71

72
STscDbg  tscDbg = {0};
73
SAppInfo appInfo;
74
int64_t  lastClusterId = 0;
75
int32_t  clientReqRefPool = -1;
76
int32_t  clientConnRefPool = -1;
77
int32_t  clientStop = -1;
78
SHashObj* pTimezoneMap = NULL;
79

80
int32_t timestampDeltaLimit = 900;  // s
81

82
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
83
volatile int32_t    tscInitRes = 0;
84

85
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
10,274,415✔
86
  int32_t code = TSDB_CODE_SUCCESS;
10,274,415✔
87
  // connection has been released already, abort creating request.
88
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
10,274,415✔
89
  if (pRequest->self < 0) {
10,281,581!
90
    tscError("failed to add ref to request");
×
91
    code = terrno;
×
92
    return code;
×
93
  }
94

95
  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
10,281,581✔
96

97
  if (pTscObj->pAppInfo) {
10,285,705!
98
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
10,285,855✔
99

100
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
10,285,855✔
101
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
10,288,671✔
102
    tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
10,286,597✔
103
             ", current:%d, app current:%d, total:%d,QID:0x%" PRIx64,
104
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
105
  }
106

107
  return code;
10,286,446✔
108
}
109

110
static void concatStrings(SArray *list, char *buf, int size) {
80✔
111
  int len = 0;
80✔
112
  for (int i = 0; i < taosArrayGetSize(list); i++) {
181✔
113
    char *db = taosArrayGet(list, i);
101✔
114
    if (NULL == db) {
101!
115
      tscError("get dbname failed, buf:%s", buf);
×
116
      break;
×
117
    }
118
    char *dot = strchr(db, '.');
101✔
119
    if (dot != NULL) {
101!
120
      db = dot + 1;
101✔
121
    }
122
    if (i != 0) {
101✔
123
      (void)strncat(buf, ",", size - 1 - len);
21✔
124
      len += 1;
21✔
125
    }
126
    int ret = tsnprintf(buf + len, size - len, "%s", db);
101✔
127
    if (ret < 0) {
101!
128
      tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
×
129
      break;
×
130
    }
131
    len += ret;
101✔
132
    if (len >= size) {
101!
133
      tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
×
134
      break;
×
135
    }
136
  }
137
}
80✔
138

139
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration) {
80✔
140
  cJSON  *json = cJSON_CreateObject();
80✔
141
  int32_t code = TSDB_CODE_SUCCESS;
80✔
142
  if (json == NULL) {
80!
143
    tscError("[monitor] cJSON_CreateObject failed");
×
144
    return TSDB_CODE_OUT_OF_MEMORY;
×
145
  }
146
  char clusterId[32] = {0};
80✔
147
  if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0) {
80!
148
    tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
×
149
    code = TSDB_CODE_FAILED;
×
150
    goto _end;
×
151
  }
152

153
  char startTs[32] = {0};
80✔
154
  if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start / 1000) < 0) {
80!
155
    tscError("failed to generate startTs:%" PRId64, pRequest->metric.start / 1000);
×
156
    code = TSDB_CODE_FAILED;
×
157
    goto _end;
×
158
  }
159

160
  char requestId[32] = {0};
80✔
161
  if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0) {
80!
162
    tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
×
163
    code = TSDB_CODE_FAILED;
×
164
    goto _end;
×
165
  }
166
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
80!
167
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
80!
168
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
80!
169
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration / 1000)));
80!
170
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
80!
171
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
80!
172
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
80!
173
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
80!
174
      json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
175
  if (pRequest->sqlstr != NULL &&
80!
176
      strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
80!
UNCOV
177
    char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
×
UNCOV
178
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
×
UNCOV
179
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
UNCOV
180
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
×
181
  } else {
182
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
80!
183
  }
184

185
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
80!
186
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
80!
187
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
80!
188

189
  char pid[32] = {0};
80✔
190
  if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0) {
80!
191
    tscError("failed to generate pid:%d", appInfo.pid);
×
192
    code = TSDB_CODE_FAILED;
×
193
    goto _end;
×
194
  }
195

196
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
80!
197
  if (pRequest->dbList != NULL) {
80!
198
    char dbList[1024] = {0};
80✔
199
    concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
80✔
200
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
80!
UNCOV
201
  } else if (pRequest->pDb != NULL) {
×
UNCOV
202
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
×
203
  } else {
204
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
×
205
  }
206

207
  char *value = cJSON_PrintUnformatted(json);
80✔
208
  if (value == NULL) {
80!
209
    tscError("failed to print json");
×
210
    code = TSDB_CODE_FAILED;
×
211
    goto _end;
×
212
  }
213
  MonitorSlowLogData data = {0};
80✔
214
  data.clusterId = pTscObj->pAppInfo->clusterId;
80✔
215
  data.type = SLOW_LOG_WRITE;
80✔
216
  data.data = value;
80✔
217
  code = monitorPutData2MonitorQueue(data);
80✔
218
  if (TSDB_CODE_SUCCESS != code) {
80!
219
    taosMemoryFree(value);
×
220
    goto _end;
×
221
  }
222

223
_end:
80✔
224
  cJSON_Delete(json);
80✔
225
  return code;
80✔
226
}
227

228
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char *exceptDb) {
709✔
229
  if (pRequest->pDb != NULL) {
709✔
230
    return strcmp(pRequest->pDb, exceptDb) != 0;
542✔
231
  }
232

233
  for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
198✔
234
    char *db = taosArrayGet(pRequest->dbList, i);
31✔
235
    if (NULL == db) {
31!
236
      tscError("get dbname failed, exceptDb:%s", exceptDb);
×
237
      return false;
×
238
    }
239
    char *dot = strchr(db, '.');
31✔
240
    if (dot != NULL) {
31!
241
      db = dot + 1;
31✔
242
    }
243
    if (strcmp(db, exceptDb) == 0) {
31!
244
      return false;
×
245
    }
246
  }
247
  return true;
167✔
248
}
249

250
static void deregisterRequest(SRequestObj *pRequest) {
10,265,853✔
251
  if (pRequest == NULL) {
10,265,853!
252
    tscError("pRequest == NULL");
×
253
    return;
×
254
  }
255

256
  STscObj            *pTscObj = pRequest->pTscObj;
10,265,853✔
257
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
10,265,853✔
258

259
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
10,265,853✔
260
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
10,285,072✔
261
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
10,283,508✔
262

263
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
10,275,507✔
264
  tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ",QID:0x%" PRIx64
10,275,507✔
265
           " elapsed:%.2f ms, "
266
           "current:%d, app current:%d",
267
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
268

269
  if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
10,275,507!
270
    if ((pRequest->pQuery && pRequest->pQuery->pRoot && QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
10,269,829!
271
         (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
9,441,007✔
272
        QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
872,662✔
273
      tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
9,434,722✔
274
               "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
275
               duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
276
               pRequest->metric.planCostUs, pRequest->metric.execCostUs);
277
      (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
9,434,722✔
278
      reqType = SLOW_LOG_TYPE_INSERT;
9,450,058✔
279
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
835,107✔
280
      tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
610,300✔
281
               "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
282
               duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
283
               pRequest->metric.planCostUs, pRequest->metric.execCostUs);
284

285
      (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
610,300✔
286
      reqType = SLOW_LOG_TYPE_QUERY;
610,300✔
287
    }
288

289
    if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
10,285,165!
290
      tscError("failed to release allocator");
×
291
    }
292
  }
293

294
  if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
10,278,107✔
295
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
8,323,688✔
296
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
8,246,618✔
297
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
77,070✔
298
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
48,600✔
299
    } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
28,470✔
300
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
13,680✔
301
    }
302
  }
303

304
  if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
10,291,814!
305
      checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
709✔
306
    (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
709✔
307
    if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
709✔
308
      taosPrintSlowLog("PID:%d, Conn:%u,QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s",
233✔
309
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
310
                       pRequest->sqlstr);
311
      if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
233✔
312
        slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
80✔
313
        if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
80!
314
          tscError("failed to generate write slow log");
×
315
        }
316
      }
317
    }
318
  }
319

320
  releaseTscObj(pTscObj->id);
10,291,105✔
321
}
322

323
// todo close the transporter properly
324
void closeTransporter(SAppInstInfo *pAppInfo) {
×
325
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
×
326
    return;
×
327
  }
328

329
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
×
330
  rpcClose(pAppInfo->pTransporter);
×
331
}
332

333
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
73,953✔
334
  if (NEED_REDIRECT_ERROR(code)) {
73,953!
335
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
64,589!
336
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK ||
64,593!
337
        msgType == TDMT_SCH_TASK_NOTIFY) {
338
      return false;
×
339
    }
340
    return true;
64,593✔
341
  } else {
342
    return false;
9,364✔
343
  }
344
}
345

346
// start timer for particular msgType
347
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
×
348
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
×
349
    return true;
×
350
  }
351
  return false;
×
352
}
353

354
// TODO refactor
355
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
2,142✔
356
  SRpcInit rpcInit;
357
  (void)memset(&rpcInit, 0, sizeof(rpcInit));
2,142✔
358
  rpcInit.localPort = 0;
2,142✔
359
  rpcInit.label = "TSC";
2,142✔
360
  rpcInit.numOfThreads = tsNumOfRpcThreads;
2,142✔
361
  rpcInit.cfp = processMsgFromServer;
2,142✔
362
  rpcInit.rfp = clientRpcRfp;
2,142✔
363
  rpcInit.sessions = 1024;
2,142✔
364
  rpcInit.connType = TAOS_CONN_CLIENT;
2,142✔
365
  rpcInit.user = (char *)user;
2,142✔
366
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,142✔
367
  rpcInit.compressSize = tsCompressMsgSize;
2,142✔
368
  rpcInit.dfp = destroyAhandle;
2,142✔
369

370
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,142✔
371
  rpcInit.retryStepFactor = tsRedirectFactor;
2,142✔
372
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,142✔
373
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,142✔
374

375
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,142✔
376
  connLimitNum = TMAX(connLimitNum, 10);
2,142✔
377
  connLimitNum = TMIN(connLimitNum, 1000);
2,142✔
378
  rpcInit.connLimitNum = connLimitNum;
2,142✔
379
  rpcInit.shareConnLimit = tsShareConnLimit;
2,142✔
380
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,142✔
381
  rpcInit.startReadTimer = 1;
2,142✔
382
  rpcInit.readTimeout = tsReadTimeout;
2,142✔
383

384
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
2,142✔
385
  if (TSDB_CODE_SUCCESS != code) {
2,142!
386
    tscError("invalid version string.");
×
387
    return code;
×
388
  }
389

390
  *pDnodeConn = rpcOpen(&rpcInit);
2,142✔
391
  if (*pDnodeConn == NULL) {
2,142!
392
    tscError("failed to init connection to server since %s", tstrerror(terrno));
×
393
    code = terrno;
×
394
  }
395

396
  return code;
2,142✔
397
}
398

399
void destroyAllRequests(SHashObj *pRequests) {
7,517✔
400
  void *pIter = taosHashIterate(pRequests, NULL);
7,517✔
401
  while (pIter != NULL) {
7,517!
402
    int64_t *rid = pIter;
×
403

404
    SRequestObj *pRequest = acquireRequest(*rid);
×
405
    if (pRequest) {
×
406
      destroyRequest(pRequest);
×
407
      (void)releaseRequest(*rid);  // ignore error
×
408
    }
409

410
    pIter = taosHashIterate(pRequests, pIter);
×
411
  }
412
}
7,517✔
413

414
void stopAllRequests(SHashObj *pRequests) {
5✔
415
  void *pIter = taosHashIterate(pRequests, NULL);
5✔
416
  while (pIter != NULL) {
5!
417
    int64_t *rid = pIter;
×
418

419
    SRequestObj *pRequest = acquireRequest(*rid);
×
420
    if (pRequest) {
×
421
      taos_stop_query(pRequest);
×
422
      (void)releaseRequest(*rid);  // ignore error
×
423
    }
424

425
    pIter = taosHashIterate(pRequests, pIter);
×
426
  }
427
}
5✔
428

429
void destroyAppInst(void *info) {
×
430
  SAppInstInfo *pAppInfo = *(SAppInstInfo **)info;
×
431
  tscDebug("destroy app inst mgr %p", pAppInfo);
×
432

433
  int32_t code = taosThreadMutexLock(&appInfo.mutex);
×
434
  if (TSDB_CODE_SUCCESS != code) {
×
435
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
436
  }
437

438
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
×
439

440
  code = taosThreadMutexUnlock(&appInfo.mutex);
×
441
  if (TSDB_CODE_SUCCESS != code) {
×
442
    tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
443
  }
444

445
  taosMemoryFreeClear(pAppInfo->instKey);
×
446
  closeTransporter(pAppInfo);
×
447

448
  code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
×
449
  if (TSDB_CODE_SUCCESS != code) {
×
450
    tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
451
  }
452

453
  taosArrayDestroy(pAppInfo->pQnodeList);
×
454
  code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
×
455
  if (TSDB_CODE_SUCCESS != code) {
×
456
    tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
457
  }
458

459
  taosMemoryFree(pAppInfo);
×
460
}
×
461

462
void destroyTscObj(void *pObj) {
7,517✔
463
  if (NULL == pObj) {
7,517!
464
    return;
×
465
  }
466

467
  STscObj *pTscObj = pObj;
7,517✔
468
  int64_t  tscId = pTscObj->id;
7,517✔
469
  tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
7,517✔
470

471
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
7,517✔
472
  hbDeregisterConn(pTscObj, connKey);
7,517✔
473

474
  destroyAllRequests(pTscObj->pRequests);
7,517✔
475
  taosHashCleanup(pTscObj->pRequests);
7,517✔
476

477
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
7,517✔
478
  tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
7,517✔
479
           pTscObj->pAppInfo->numOfConns);
480

481
  // In any cases, we should not free app inst here. Or an race condition rises.
482
  /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
7,517✔
483

484
  (void)taosThreadMutexDestroy(&pTscObj->mutex);
7,517✔
485
  taosMemoryFree(pTscObj);
7,517!
486

487
  tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
7,517✔
488
}
489

490
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
7,980✔
491
                     STscObj **pObj) {
492
  *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
7,980!
493
  if (NULL == *pObj) {
7,980!
494
    return terrno;
×
495
  }
496

497
  (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
7,980✔
498
  if (NULL == (*pObj)->pRequests) {
7,980!
499
    taosMemoryFree(*pObj);
×
500
    return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
501
  }
502

503
  (*pObj)->connType = connType;
7,980✔
504
  (*pObj)->pAppInfo = pAppInfo;
7,980✔
505
  (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
7,980✔
506
  tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
7,980✔
507
  (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
7,980✔
508

509
  if (db != NULL) {
7,980!
510
    tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
7,980✔
511
  }
512

513
  TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
7,980!
514

515
  int32_t code = TSDB_CODE_SUCCESS;
7,980✔
516

517
  (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
7,980✔
518
  if ((*pObj)->id < 0) {
7,980!
519
    tscError("failed to add object to clientConnRefPool");
×
520
    code = terrno;
×
521
    taosMemoryFree(*pObj);
×
522
    return code;
×
523
  }
524

525
  (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
7,980✔
526

527
  tscDebug("connObj created, 0x%" PRIx64 ",p:%p", (*pObj)->id, *pObj);
7,980✔
528
  return code;
7,980✔
529
}
530

531
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
18,908,109✔
532

533
void releaseTscObj(int64_t rid) {
18,962,085✔
534
  int32_t code = taosReleaseRef(clientConnRefPool, rid);
18,962,085✔
535
  if (TSDB_CODE_SUCCESS != code) {
18,962,345!
536
    tscWarn("failed to release TscObj, code:%s", tstrerror(code));
×
537
  }
538
}
18,962,345✔
539

540
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
10,268,372✔
541
  int32_t code = TSDB_CODE_SUCCESS;
10,268,372✔
542
  *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
10,268,372!
543
  if (NULL == *pRequest) {
10,278,617!
544
    return terrno;
×
545
  }
546

547
  STscObj *pTscObj = acquireTscObj(connId);
10,278,617✔
548
  if (pTscObj == NULL) {
10,282,426!
549
    TSC_ERR_JRET(TSDB_CODE_TSC_DISCONNECTED);
×
550
  }
551
  SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
10,282,426!
552
  if (interParam == NULL) {
10,278,827!
553
    releaseTscObj(connId);
×
554
    TSC_ERR_JRET(terrno);
×
555
  }
556
  TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
10,278,827!
557
  interParam->pRequest = *pRequest;
10,278,169✔
558
  (*pRequest)->body.interParam = interParam;
10,278,169✔
559

560
  (*pRequest)->resType = RES_TYPE__QUERY;
10,278,169✔
561
  (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
10,278,169!
562
  (*pRequest)->metric.start = taosGetTimestampUs();
10,286,888✔
563

564
  (*pRequest)->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
10,282,922✔
565
  (*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt;
10,282,922✔
566
  (*pRequest)->type = type;
10,282,922✔
567
  (*pRequest)->allocatorRefId = -1;
10,282,922✔
568

569
  (*pRequest)->pDb = getDbOfConnection(pTscObj);
10,282,922✔
570
  if (NULL == (*pRequest)->pDb) {
10,284,301✔
571
    TSC_ERR_JRET(terrno);
209,498!
572
  }
573
  (*pRequest)->pTscObj = pTscObj;
10,284,301✔
574
  (*pRequest)->inCallback = false;
10,284,301✔
575
  (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
10,284,301!
576
  if (NULL == (*pRequest)->msgBuf) {
10,277,010!
577
    code = terrno;
×
578
    goto _return;
×
579
  }
580
  (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
10,277,010✔
581
  TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
10,277,010!
582
  TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
10,275,438!
583

584
  return TSDB_CODE_SUCCESS;
10,285,713✔
585
_return:
×
586
  if ((*pRequest)->pTscObj) {
×
587
    doDestroyRequest(*pRequest);
×
588
  } else {
589
    taosMemoryFree(*pRequest);
×
590
  }
591
  return code;
×
592
}
593

594
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
10,628,004✔
595
  taosMemoryFreeClear(pResInfo->pRspMsg);
10,628,004!
596
  taosMemoryFreeClear(pResInfo->length);
10,628,004!
597
  taosMemoryFreeClear(pResInfo->row);
10,628,010!
598
  taosMemoryFreeClear(pResInfo->pCol);
10,628,009!
599
  taosMemoryFreeClear(pResInfo->fields);
10,628,010!
600
  taosMemoryFreeClear(pResInfo->userFields);
10,628,007!
601
  taosMemoryFreeClear(pResInfo->convertJson);
10,628,007!
602
  taosMemoryFreeClear(pResInfo->decompBuf);
10,628,007!
603

604
  if (pResInfo->convertBuf != NULL) {
10,628,007✔
605
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
4,095,209✔
606
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
3,168,402!
607
    }
608
    taosMemoryFreeClear(pResInfo->convertBuf);
926,807!
609
  }
610
}
10,628,009✔
611

612
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
43,783,680✔
613

614
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
44,527,696✔
615

616
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
10,267,326✔
617

618
/// return the most previous req ref id
619
int64_t removeFromMostPrevReq(SRequestObj *pRequest) {
10,262,938✔
620
  int64_t      mostPrevReqRefId = pRequest->self;
10,262,938✔
621
  SRequestObj *pTmp = pRequest;
10,262,938✔
622
  while (pTmp->relation.prevRefId) {
10,263,422✔
623
    pTmp = acquireRequest(pTmp->relation.prevRefId);
484✔
624
    if (pTmp) {
484!
625
      mostPrevReqRefId = pTmp->self;
484✔
626
      (void)releaseRequest(mostPrevReqRefId);  // ignore error
484✔
627
    } else {
628
      break;
×
629
    }
630
  }
631
  (void)removeRequest(mostPrevReqRefId);  // ignore error
10,262,938✔
632
  return mostPrevReqRefId;
10,275,551✔
633
}
634

635
void destroyNextReq(int64_t nextRefId) {
10,285,312✔
636
  if (nextRefId) {
10,285,312✔
637
    SRequestObj *pObj = acquireRequest(nextRefId);
508✔
638
    if (pObj) {
508!
639
      (void)releaseRequest(nextRefId);  // ignore error
508✔
640
      (void)releaseRequest(nextRefId);  // ignore error
508✔
641
    }
642
  }
643
}
10,285,312✔
644

645
void destroySubRequests(SRequestObj *pRequest) {
×
646
  int32_t      reqIdx = -1;
×
647
  SRequestObj *pReqList[16] = {NULL};
×
648
  uint64_t     tmpRefId = 0;
×
649

650
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
×
651
    return;
×
652
  }
653

654
  SRequestObj *pTmp = pRequest;
×
655
  while (pTmp->relation.prevRefId) {
×
656
    tmpRefId = pTmp->relation.prevRefId;
×
657
    pTmp = acquireRequest(tmpRefId);
×
658
    if (pTmp) {
×
659
      pReqList[++reqIdx] = pTmp;
×
660
      (void)releaseRequest(tmpRefId);  // ignore error
×
661
    } else {
662
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
663
      break;
×
664
    }
665
  }
666

667
  for (int32_t i = reqIdx; i >= 0; i--) {
×
668
    (void)removeRequest(pReqList[i]->self);  // ignore error
×
669
  }
670

671
  tmpRefId = pRequest->relation.nextRefId;
×
672
  while (tmpRefId) {
×
673
    pTmp = acquireRequest(tmpRefId);
×
674
    if (pTmp) {
×
675
      tmpRefId = pTmp->relation.nextRefId;
×
676
      (void)removeRequest(pTmp->self);   // ignore error
×
677
      (void)releaseRequest(pTmp->self);  // ignore error
×
678
    } else {
679
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
680
      break;
×
681
    }
682
  }
683
}
684

685
void doDestroyRequest(void *p) {
10,275,309✔
686
  if (NULL == p) {
10,275,309!
687
    return;
×
688
  }
689

690
  SRequestObj *pRequest = (SRequestObj *)p;
10,275,309✔
691

692
  uint64_t reqId = pRequest->requestId;
10,275,309✔
693
  tscDebug("begin to destroy request 0x%" PRIx64 " p:%p", reqId, pRequest);
10,275,309✔
694

695
  int64_t nextReqRefId = pRequest->relation.nextRefId;
10,275,309✔
696

697
  int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
10,275,309✔
698
  if (TSDB_CODE_SUCCESS != code) {
10,273,272✔
699
    tscWarn("failed to remove request from hash, code:%s", tstrerror(code));
7,980!
700
  }
701
  schedulerFreeJob(&pRequest->body.queryJob, 0);
10,273,272✔
702

703
  destorySqlCallbackWrapper(pRequest->pWrapper);
10,267,171✔
704

705
  taosMemoryFreeClear(pRequest->msgBuf);
10,265,958!
706

707
  doFreeReqResultInfo(&pRequest->body.resInfo);
10,269,265✔
708
  if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
10,264,492!
709
    tscError("failed to destroy semaphore");
×
710
  }
711

712
  taosArrayDestroy(pRequest->tableList);
10,263,499✔
713
  taosArrayDestroy(pRequest->targetTableList);
10,257,391✔
714
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
10,255,367✔
715

716
  if (pRequest->self) {
10,274,145✔
717
    deregisterRequest(pRequest);
10,272,501✔
718
  }
719

720
  taosMemoryFreeClear(pRequest->pDb);
10,291,481!
721
  taosArrayDestroy(pRequest->dbList);
10,291,474✔
722
  if (pRequest->body.interParam) {
10,289,870✔
723
    if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
10,289,835!
724
      tscError("failed to destroy semaphore in pRequest");
×
725
    }
726
  }
727
  taosMemoryFree(pRequest->body.interParam);
10,289,880!
728

729
  qDestroyQuery(pRequest->pQuery);
10,289,845✔
730
  nodesDestroyAllocator(pRequest->allocatorRefId);
10,284,323✔
731

732
  taosMemoryFreeClear(pRequest->effectiveUser);
10,281,500!
733
  taosMemoryFreeClear(pRequest->sqlstr);
10,281,500!
734
  taosMemoryFree(pRequest);
10,285,542!
735
  tscDebug("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
10,286,667✔
736
  destroyNextReq(nextReqRefId);
10,286,668✔
737
}
738

739
void destroyRequest(SRequestObj *pRequest) {
10,269,717✔
740
  if (pRequest == NULL) {
10,269,717!
741
    return;
×
742
  }
743

744
  taos_stop_query(pRequest);
10,269,717✔
745
  (void)removeFromMostPrevReq(pRequest);
10,273,276✔
746
}
747

748
void taosStopQueryImpl(SRequestObj *pRequest) {
10,272,304✔
749
  pRequest->killed = true;
10,272,304✔
750

751
  // It is not a query, no need to stop.
752
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
10,272,304✔
753
    tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
204,124!
754
    return;
215,345✔
755
  }
756

757
  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
10,068,180✔
758
  tscDebug("request %" PRIx64 " killed", pRequest->requestId);
10,059,350✔
759
}
760

761
void stopAllQueries(SRequestObj *pRequest) {
10,269,646✔
762
  int32_t      reqIdx = -1;
10,269,646✔
763
  SRequestObj *pReqList[16] = {NULL};
10,269,646✔
764
  uint64_t     tmpRefId = 0;
10,269,646✔
765

766
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
10,269,646!
767
    return;
×
768
  }
769

770
  SRequestObj *pTmp = pRequest;
10,269,646✔
771
  while (pTmp->relation.prevRefId) {
10,270,130✔
772
    tmpRefId = pTmp->relation.prevRefId;
484✔
773
    pTmp = acquireRequest(tmpRefId);
484✔
774
    if (pTmp) {
484!
775
      pReqList[++reqIdx] = pTmp;
484✔
776
    } else {
777
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
778
      break;
×
779
    }
780
  }
781

782
  for (int32_t i = reqIdx; i >= 0; i--) {
10,274,216✔
783
    taosStopQueryImpl(pReqList[i]);
484✔
784
    (void)releaseRequest(pReqList[i]->self);  // ignore error
484✔
785
  }
786

787
  taosStopQueryImpl(pRequest);
10,273,732✔
788

789
  tmpRefId = pRequest->relation.nextRefId;
10,274,018✔
790
  while (tmpRefId) {
10,274,295!
791
    pTmp = acquireRequest(tmpRefId);
×
792
    if (pTmp) {
×
793
      tmpRefId = pTmp->relation.nextRefId;
×
794
      taosStopQueryImpl(pTmp);
×
795
      (void)releaseRequest(pTmp->self);  // ignore error
×
796
    } else {
797
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
798
      break;
×
799
    }
800
  }
801
}
802

803
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
×
804

805
static void *tscCrashReportThreadFp(void *param) {
×
806
  int32_t code = 0;
×
807
  setThreadName("client-crashReport");
×
808
  char filepath[PATH_MAX] = {0};
×
809
  (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
×
810
  char     *pMsg = NULL;
×
811
  int64_t   msgLen = 0;
×
812
  TdFilePtr pFile = NULL;
×
813
  bool      truncateFile = false;
×
814
  int32_t   sleepTime = 200;
×
815
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
816
  int32_t   loopTimes = reportPeriodNum;
×
817

818
#ifdef WINDOWS
819
  if (taosCheckCurrentInDll()) {
820
    atexit(crashReportThreadFuncUnexpectedStopped);
821
  }
822
#endif
823

824
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
×
825
    return NULL;
×
826
  }
827
  STelemAddrMgmt mgt;
828
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
829
  if (code) {
×
830
    tscError("failed to init telemetry management, code:%s", tstrerror(code));
×
831
    return NULL;
×
832
  }
833

834
  while (1) {
835
    if (clientStop > 0) break;
×
836
    if (loopTimes++ < reportPeriodNum) {
×
837
      taosMsleep(sleepTime);
×
838
      continue;
×
839
    }
840

841
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
842
    if (pMsg && msgLen > 0) {
×
843
      if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
844
        tscError("failed to send crash report");
×
845
        if (pFile) {
×
846
          taosReleaseCrashLogFile(pFile, false);
×
847
          pFile = NULL;
×
848

849
          taosMsleep(sleepTime);
×
850
          loopTimes = 0;
×
851
          continue;
×
852
        }
853
      } else {
854
        tscInfo("succeed to send crash report");
×
855
        truncateFile = true;
×
856
      }
857
    } else {
858
      tscDebug("no crash info");
×
859
    }
860

861
    taosMemoryFree(pMsg);
×
862

863
    if (pMsg && msgLen > 0) {
×
864
      pMsg = NULL;
×
865
      continue;
×
866
    }
867

868
    if (pFile) {
×
869
      taosReleaseCrashLogFile(pFile, truncateFile);
×
870
      pFile = NULL;
×
871
      truncateFile = false;
×
872
    }
873

874
    taosMsleep(sleepTime);
×
875
    loopTimes = 0;
×
876
  }
877
  taosTelemetryDestroy(&mgt);
×
878

879
  clientStop = -2;
×
880
  return NULL;
×
881
}
882

883
int32_t tscCrashReportInit() {
1,976✔
884
  if (!tsEnableCrashReport) {
1,976!
885
    return TSDB_CODE_SUCCESS;
1,976✔
886
  }
887
  int32_t      code = TSDB_CODE_SUCCESS;
×
888
  TdThreadAttr thAttr;
889
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
×
890
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
×
891
  TdThread crashReportThread;
892
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
×
893
    tscError("failed to create crashReport thread since %s", strerror(errno));
×
894
    terrno = TAOS_SYSTEM_ERROR(errno);
×
895
    TSC_ERR_RET(errno);
×
896
  }
897

898
  (void)taosThreadAttrDestroy(&thAttr);
×
899
_return:
×
900
  if (code) {
×
901
    terrno = TAOS_SYSTEM_ERROR(errno);
×
902
    TSC_ERR_RET(terrno);
×
903
  }
904

905
  return code;
×
906
}
907

908
void tscStopCrashReport() {
1,976✔
909
  if (!tsEnableCrashReport) {
1,976!
910
    return;
1,976✔
911
  }
912

913
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
×
914
    tscDebug("crash report thread already stopped");
×
915
    return;
×
916
  }
917

918
  while (atomic_load_32(&clientStop) > 0) {
×
919
    taosMsleep(100);
×
920
  }
921
}
922

923
void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
×
924
  char       *pMsg = NULL;
×
925
  const char *flags = "UTL FATAL ";
×
926
  ELogLevel   level = DEBUG_FATAL;
×
927
  int32_t     dflag = 255;
×
928
  int64_t     msgLen = -1;
×
929

930
  if (tsEnableCrashReport) {
×
931
    if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
×
932
      taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
×
933
    } else {
934
      msgLen = strlen(pMsg);
×
935
    }
936
  }
937

938
  taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
×
939
}
×
940

941
void taos_init_imp(void) {
1,976✔
942
#if defined(LINUX)
943
  if (tscDbg.memEnable) {
1,976!
944
    int32_t code = taosMemoryDbgInit();
×
945
    if (code) {
×
946
      (void)printf("failed to init memory dbg, error:%s\n", tstrerror(code));
×
947
    } else {
948
      tsAsyncLog = false;
×
949
      (void)printf("memory dbg enabled\n");
×
950
    }
951
  }
952
#endif
953

954
  // In the APIs of other program language, taos_cleanup is not available yet.
955
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
956
  (void)atexit(taos_cleanup);
1,976✔
957
  errno = TSDB_CODE_SUCCESS;
1,976✔
958
  taosSeedRand(taosGetTimestampSec());
1,976✔
959

960
  appInfo.pid = taosGetPId();
1,976✔
961
  appInfo.startTime = taosGetTimestampMs();
1,976✔
962
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,976✔
963
  appInfo.pInstMapByClusterId =
1,976✔
964
      taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1,976✔
965
  if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
1,976!
966
    (void)printf("failed to allocate memory when init appInfo\n");
×
967
    tscInitRes = terrno;
×
968
    return;
×
969
  }
970
  taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
1,976✔
971

972
  const char *logName = CUS_PROMPT "slog";
1,976✔
973
  ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
1,976!
974
  if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
1,976!
975
    (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(errno), configDir);
×
976
    tscInitRes = terrno;
×
977
    return;
×
978
  }
979

980
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
1,976!
981

982
  initQueryModuleMsgHandle();
1,976✔
983
  if ((tsCharsetCxt = taosConvInit(tsCharset)) == NULL){
1,976!
984
    tscInitRes = terrno;
×
985
    tscError("failed to init conv");
×
986
    return;
×
987
  }
988
#ifndef WINDOWS
989
  ENV_ERR_RET(tzInit(), "failed to init timezone");
1,976!
990
#endif
991
  ENV_ERR_RET(monitorInit(), "failed to init monitor");
1,976!
992
  ENV_ERR_RET(rpcInit(), "failed to init rpc");
1,976!
993

994
  if (InitRegexCache() != 0) {
1,976!
995
    tscInitRes = terrno;
×
996
    (void)printf("failed to init regex cache\n");
×
997
    return;
×
998
  }
999

1000
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
1,976✔
1001
  ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
1,976!
1002
  ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
1,976!
1003
  ENV_ERR_RET(initClientId(), "failed to init clientId");
1,976!
1004

1005
  tscDebug("starting to initialize TAOS driver");
1,976✔
1006

1007
  ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
1,976!
1008
  ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
1,976!
1009
  ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
1,976!
1010

1011
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
1,976✔
1012
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
1,976✔
1013

1014
  ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
1,976!
1015
  ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
1,976!
1016
  ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
1,976!
1017
  ENV_ERR_RET(qInitKeywordsTable(), "failed to init parser keywords table");
1,976!
1018

1019
  tscDebug("client is initialized successfully");
1,976✔
1020
}
1021

1022
int taos_init() {
8,474✔
1023
  (void)taosThreadOnce(&tscinit, taos_init_imp);
8,474✔
1024
  return tscInitRes;
8,474✔
1025
}
1026

1027
const char *getCfgName(TSDB_OPTION option) {
10✔
1028
  const char *name = NULL;
10✔
1029

1030
  switch (option) {
10!
1031
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
×
1032
      name = "shellActivityTimer";
×
1033
      break;
×
1034
    case TSDB_OPTION_LOCALE:
×
1035
      name = "locale";
×
1036
      break;
×
1037
    case TSDB_OPTION_CHARSET:
×
1038
      name = "charset";
×
1039
      break;
×
1040
    case TSDB_OPTION_TIMEZONE:
10✔
1041
      name = "timezone";
10✔
1042
      break;
10✔
1043
    case TSDB_OPTION_USE_ADAPTER:
×
1044
      name = "useAdapter";
×
1045
      break;
×
1046
    default:
×
1047
      break;
×
1048
  }
1049

1050
  return name;
10✔
1051
}
1052

1053
int taos_options_imp(TSDB_OPTION option, const char *str) {
1,398✔
1054
  if (option == TSDB_OPTION_CONFIGDIR) {
1,398✔
1055
#ifndef WINDOWS
1056
    char newstr[PATH_MAX];
1057
    int  len = strlen(str);
1,388✔
1058
    if (len > 1 && str[0] != '"' && str[0] != '\'') {
1,388!
1059
      if (len + 2 >= PATH_MAX) {
1,383!
1060
        tscError("Too long path %s", str);
×
1061
        return -1;
×
1062
      }
1063
      newstr[0] = '"';
1,383✔
1064
      (void)memcpy(newstr + 1, str, len);
1,383✔
1065
      newstr[len + 1] = '"';
1,383✔
1066
      newstr[len + 2] = '\0';
1,383✔
1067
      str = newstr;
1,383✔
1068
    }
1069
#endif
1070
    tstrncpy(configDir, str, PATH_MAX);
1,388✔
1071
    tscInfo("set cfg:%s to %s", configDir, str);
1,388!
1072
    return 0;
1,388✔
1073
  }
1074

1075
  // initialize global config
1076
  if (taos_init() != 0) {
10!
1077
    return -1;
×
1078
  }
1079

1080
  SConfig     *pCfg = taosGetCfg();
10✔
1081
  SConfigItem *pItem = NULL;
10✔
1082
  const char  *name = getCfgName(option);
10✔
1083

1084
  if (name == NULL) {
10!
1085
    tscError("Invalid option %d", option);
×
1086
    return -1;
×
1087
  }
1088

1089
  pItem = cfgGetItem(pCfg, name);
10✔
1090
  if (pItem == NULL) {
10!
1091
    tscError("Invalid option %d", option);
×
1092
    return -1;
×
1093
  }
1094

1095
  int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
10✔
1096
  if (code != 0) {
10!
1097
    tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
×
1098
  } else {
1099
    tscInfo("set cfg:%s to %s", name, str);
10!
1100
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
10!
1101
      code = taosCfgDynamicOptions(pCfg, name, false);
×
1102
    }
1103
  }
1104

1105
  return code;
10✔
1106
}
1107

1108
/**
1109
 * The request id is an unsigned integer format of 64bit.
1110
 *+------------+-----+-----------+---------------+
1111
 *| uid|localIp| PId | timestamp | serial number |
1112
 *+------------+-----+-----------+---------------+
1113
 *| 12bit      |12bit|24bit      |16bit          |
1114
 *+------------+-----+-----------+---------------+
1115
 * @return
1116
 */
1117
uint64_t generateRequestId() {
10,374,402✔
1118
  static uint32_t hashId = 0;
1119
  static int32_t  requestSerialId = 0;
1120

1121
  if (hashId == 0) {
10,374,402✔
1122
    int32_t code = taosGetSystemUUIDU32(&hashId);
1,967✔
1123
    if (code != TSDB_CODE_SUCCESS) {
1,967!
1124
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
×
1125
               tstrerror(code));
1126
    }
1127
  }
1128

1129
  uint64_t id = 0;
10,374,587✔
1130

1131
  while (true) {
×
1132
    int64_t  ts = taosGetTimestampMs();
10,368,200✔
1133
    uint64_t pid = taosGetPId();
10,368,200✔
1134
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
10,367,514✔
1135
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
10,385,090✔
1136

1137
    id = (((uint64_t)(hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
10,386,312✔
1138
    if (id) {
10,386,312!
1139
      break;
10,386,312✔
1140
    }
1141
  }
1142
  return id;
10,386,312✔
1143
}
1144

1145
#if 0
1146
#include "cJSON.h"
1147
static setConfRet taos_set_config_imp(const char *config){
1148
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
1149
  static bool setConfFlag = false;
1150
  if (setConfFlag) {
1151
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
1152
    tstrncpy(ret.retMsg, "configuration can only set once", RET_MSG_LENGTH);
1153
    return ret;
1154
  }
1155
  taosInitGlobalCfg();
1156
  cJSON *root = cJSON_Parse(config);
1157
  if (root == NULL){
1158
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
1159
    tstrncpy(ret.retMsg, "parse json error", RET_MSG_LENGTH);
1160
    return ret;
1161
  }
1162

1163
  int size = cJSON_GetArraySize(root);
1164
  if(!cJSON_IsObject(root) || size == 0) {
1165
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
1166
    tstrncpy(ret.retMsg, "json content is invalid, must be not empty object", RET_MSG_LENGTH);
1167
    return ret;
1168
  }
1169

1170
  if(size >= 1000) {
1171
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
1172
    tstrncpy(ret.retMsg, "json object size is too long", RET_MSG_LENGTH);
1173
    return ret;
1174
  }
1175

1176
  for(int i = 0; i < size; i++){
1177
    cJSON *item = cJSON_GetArrayItem(root, i);
1178
    if(!item) {
1179
      ret.retCode = SET_CONF_RET_ERR_INNER;
1180
      tstrncpy(ret.retMsg, "inner error", RET_MSG_LENGTH);
1181
      return ret;
1182
    }
1183
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
1184
      ret.retCode = SET_CONF_RET_ERR_PART;
1185
      if (strlen(ret.retMsg) == 0){
1186
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
1187
      }else{
1188
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1189
        size_t leftSize = tmp >= 0 ? tmp : 0;
1190
        strncat(ret.retMsg, "|",  leftSize);
1191
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1192
        leftSize = tmp >= 0 ? tmp : 0;
1193
        strncat(ret.retMsg, item->string, leftSize);
1194
      }
1195
    }
1196
  }
1197
  cJSON_Delete(root);
1198
  setConfFlag = true;
1199
  return ret;
1200
}
1201

1202
setConfRet taos_set_config(const char *config){
1203
  taosThreadMutexLock(&setConfMutex);
1204
  setConfRet ret = taos_set_config_imp(config);
1205
  taosThreadMutexUnlock(&setConfMutex);
1206
  return ret;
1207
}
1208
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc