• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3548

04 Dec 2024 01:03PM UTC coverage: 59.846% (-0.8%) from 60.691%
#3548

push

travis-ci

web-flow
Merge pull request #29033 from taosdata/fix/calculate-vnode-memory-used

fix/calculate-vnode-memory-used

118484 of 254183 branches covered (46.61%)

Branch coverage included in aggregate %.

199691 of 277471 relevant lines covered (71.97%)

18794141.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.12
/source/client/src/clientEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ttimer.h>
17
#include "cJSON.h"
18
#include "catalog.h"
19
#include "clientInt.h"
20
#include "clientLog.h"
21
#include "clientMonitor.h"
22
#include "functionMgt.h"
23
#include "os.h"
24
#include "osSleep.h"
25
#include "query.h"
26
#include "qworker.h"
27
#include "scheduler.h"
28
#include "tcache.h"
29
#include "tcompare.h"
30
#include "tglobal.h"
31
#include "thttp.h"
32
#include "tmsg.h"
33
#include "tqueue.h"
34
#include "tref.h"
35
#include "trpc.h"
36
#include "tsched.h"
37
#include "ttime.h"
38
#include "tversion.h"
39

40
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
41
#include "cus_name.h"
42
#endif
43

44
#ifndef CUS_PROMPT
45
#define CUS_PROMPT "tao"
46
#endif
47

48
#define TSC_VAR_NOT_RELEASE 1
49
#define TSC_VAR_RELEASED    0
50

51
#define ENV_JSON_FALSE_CHECK(c)                     \
52
  do {                                              \
53
    if (!c) {                                       \
54
      tscError("faild to add item to JSON object"); \
55
      code = TSDB_CODE_TSC_FAIL_GENERATE_JSON;      \
56
      goto _end;                                    \
57
    }                                               \
58
  } while (0)
59

60
#define ENV_ERR_RET(c, info)          \
61
  do {                                \
62
    int32_t _code = c;                \
63
    if (_code != TSDB_CODE_SUCCESS) { \
64
      errno = _code;                  \
65
      tscInitRes = _code;             \
66
      tscError(info);                 \
67
      return;                         \
68
    }                                 \
69
  } while (0)
70

71
STscDbg  tscDbg = {0};
72
SAppInfo appInfo;
73
int64_t  lastClusterId = 0;
74
int32_t  clientReqRefPool = -1;
75
int32_t  clientConnRefPool = -1;
76
int32_t  clientStop = -1;
77

78
int32_t timestampDeltaLimit = 900;  // s
79

80
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
81
volatile int32_t    tscInitRes = 0;
82

83
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
10,601,813✔
84
  int32_t code = TSDB_CODE_SUCCESS;
10,601,813✔
85
  // connection has been released already, abort creating request.
86
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
10,601,813✔
87
  if (pRequest->self < 0) {
10,604,581!
88
    tscError("failed to add ref to request");
×
89
    code = terrno;
×
90
    return code;
×
91
  }
92

93
  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
10,604,581✔
94

95
  if (pTscObj->pAppInfo) {
10,613,895!
96
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
10,614,227✔
97

98
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
10,614,227✔
99
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
10,613,778✔
100
    tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
10,611,431✔
101
             ", current:%d, app current:%d, total:%d,QID:0x%" PRIx64,
102
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
103
  }
104

105
  return code;
10,611,841✔
106
}
107

108
static void concatStrings(SArray *list, char *buf, int size) {
51✔
109
  int len = 0;
51✔
110
  for (int i = 0; i < taosArrayGetSize(list); i++) {
102✔
111
    char *db = taosArrayGet(list, i);
51✔
112
    if (NULL == db) {
51!
113
      tscError("get dbname failed, buf:%s", buf);
×
114
      break;
×
115
    }
116
    char *dot = strchr(db, '.');
51✔
117
    if (dot != NULL) {
51!
118
      db = dot + 1;
51✔
119
    }
120
    if (i != 0) {
51!
121
      (void)strncat(buf, ",", size - 1 - len);
×
122
      len += 1;
×
123
    }
124
    int ret = tsnprintf(buf + len, size - len, "%s", db);
51✔
125
    if (ret < 0) {
51!
126
      tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
×
127
      break;
×
128
    }
129
    len += ret;
51✔
130
    if (len >= size) {
51!
131
      tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
×
132
      break;
×
133
    }
134
  }
135
}
51✔
136

137
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration) {
51✔
138
  cJSON  *json = cJSON_CreateObject();
51✔
139
  int32_t code = TSDB_CODE_SUCCESS;
51✔
140
  if (json == NULL) {
51!
141
    tscError("[monitor] cJSON_CreateObject failed");
×
142
    return TSDB_CODE_OUT_OF_MEMORY;
×
143
  }
144
  char clusterId[32] = {0};
51✔
145
  if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0) {
51!
146
    tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
×
147
    code = TSDB_CODE_FAILED;
×
148
    goto _end;
×
149
  }
150

151
  char startTs[32] = {0};
51✔
152
  if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start / 1000) < 0) {
51!
153
    tscError("failed to generate startTs:%" PRId64, pRequest->metric.start / 1000);
×
154
    code = TSDB_CODE_FAILED;
×
155
    goto _end;
×
156
  }
157

158
  char requestId[32] = {0};
51✔
159
  if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0) {
51!
160
    tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
×
161
    code = TSDB_CODE_FAILED;
×
162
    goto _end;
×
163
  }
164
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
51!
165
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
51!
166
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
51!
167
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration / 1000)));
51!
168
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
51!
169
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
51!
170
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
51!
171
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
51!
172
      json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
173
  if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
51!
174
    char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
×
175
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
×
176
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
177
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
×
178
  } else {
179
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
51!
180
  }
181

182
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
51!
183
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
51!
184
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
51!
185

186
  char pid[32] = {0};
51✔
187
  if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0) {
51!
188
    tscError("failed to generate pid:%d", appInfo.pid);
×
189
    code = TSDB_CODE_FAILED;
×
190
    goto _end;
×
191
  }
192

193
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
51!
194
  if (pRequest->dbList != NULL) {
51!
195
    char dbList[1024] = {0};
51✔
196
    concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
51✔
197
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
51!
198
  } else if (pRequest->pDb != NULL) {
×
199
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
×
200
  } else {
201
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
×
202
  }
203

204
  char *value = cJSON_PrintUnformatted(json);
51✔
205
  if (value == NULL) {
51!
206
    tscError("failed to print json");
×
207
    code = TSDB_CODE_FAILED;
×
208
    goto _end;
×
209
  }
210
  MonitorSlowLogData data = {0};
51✔
211
  data.clusterId = pTscObj->pAppInfo->clusterId;
51✔
212
  data.type = SLOW_LOG_WRITE;
51✔
213
  data.data = value;
51✔
214
  code = monitorPutData2MonitorQueue(data);
51✔
215
  if (TSDB_CODE_SUCCESS != code) {
51!
216
    taosMemoryFree(value);
×
217
    goto _end;
×
218
  }
219

220
_end:
51✔
221
  cJSON_Delete(json);
51✔
222
  return code;
51✔
223
}
224

225
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char *exceptDb) {
821✔
226
  if (pRequest->pDb != NULL) {
821✔
227
    return strcmp(pRequest->pDb, exceptDb) != 0;
565✔
228
  }
229

230
  for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
313✔
231
    char *db = taosArrayGet(pRequest->dbList, i);
57✔
232
    if (NULL == db) {
57!
233
      tscError("get dbname failed, exceptDb:%s", exceptDb);
×
234
      return false;
×
235
    }
236
    char *dot = strchr(db, '.');
57✔
237
    if (dot != NULL) {
57!
238
      db = dot + 1;
57✔
239
    }
240
    if (strcmp(db, exceptDb) == 0) {
57!
241
      return false;
×
242
    }
243
  }
244
  return true;
256✔
245
}
246

247
static void deregisterRequest(SRequestObj *pRequest) {
10,605,386✔
248
  if (pRequest == NULL) {
10,605,386!
249
    tscError("pRequest == NULL");
×
250
    return;
×
251
  }
252

253
  STscObj            *pTscObj = pRequest->pTscObj;
10,605,386✔
254
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
10,605,386✔
255

256
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
10,605,386✔
257
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
10,614,316✔
258
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
10,613,347✔
259

260
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
10,611,518✔
261
  tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ",QID:0x%" PRIx64
10,611,518✔
262
           " elapsed:%.2f ms, "
263
           "current:%d, app current:%d",
264
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
265

266
  if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
10,611,518!
267
    if ((pRequest->pQuery && pRequest->pQuery->pRoot && QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
10,609,895!
268
         (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
9,573,418✔
269
        QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
1,097,001✔
270
      tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
9,571,762✔
271
               "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
272
               duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
273
               pRequest->metric.planCostUs, pRequest->metric.execCostUs);
274
      (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
9,571,762✔
275
      reqType = SLOW_LOG_TYPE_INSERT;
9,575,607✔
276
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
1,038,133✔
277
      tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
746,126✔
278
               "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
279
               duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
280
               pRequest->metric.planCostUs, pRequest->metric.execCostUs);
281

282
      (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
746,126✔
283
      reqType = SLOW_LOG_TYPE_QUERY;
746,126✔
284
    }
285

286
    if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
10,613,740!
287
      tscError("failed to release allocator");
×
288
    }
289
  }
290

291
  if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
10,611,427✔
292
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
8,497,959✔
293
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
8,319,362✔
294
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
178,597✔
295
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
115,058✔
296
    } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
63,539✔
297
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
46,879✔
298
    }
299
  }
300

301
  if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
10,617,783!
302
      checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
821✔
303
    (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
821✔
304
    if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
821✔
305
      taosPrintSlowLog("PID:%d, Conn:%u,QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s",
219✔
306
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
307
                       pRequest->sqlstr);
308
      if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
219✔
309
        slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
51✔
310
        if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
51!
311
          tscError("failed to generate write slow log");
×
312
        }
313
      }
314
    }
315
  }
316

317
  releaseTscObj(pTscObj->id);
10,616,962✔
318
}
319

320
// todo close the transporter properly
321
void closeTransporter(SAppInstInfo *pAppInfo) {
×
322
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
×
323
    return;
×
324
  }
325

326
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
×
327
  rpcClose(pAppInfo->pTransporter);
×
328
}
329

330
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
81,294✔
331
  if (NEED_REDIRECT_ERROR(code)) {
81,294!
332
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
70,922!
333
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK ||
70,928!
334
        msgType == TDMT_SCH_TASK_NOTIFY) {
335
      return false;
×
336
    }
337
    return true;
70,928✔
338
  } else {
339
    return false;
10,372✔
340
  }
341
}
342

343
// start timer for particular msgType
344
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
×
345
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
×
346
    return true;
×
347
  }
348
  return false;
×
349
}
350

351
// TODO refactor
352
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
2,910✔
353
  SRpcInit rpcInit;
354
  (void)memset(&rpcInit, 0, sizeof(rpcInit));
2,910✔
355
  rpcInit.localPort = 0;
2,910✔
356
  rpcInit.label = "TSC";
2,910✔
357
  rpcInit.numOfThreads = tsNumOfRpcThreads;
2,910✔
358
  rpcInit.cfp = processMsgFromServer;
2,910✔
359
  rpcInit.rfp = clientRpcRfp;
2,910✔
360
  rpcInit.sessions = 1024;
2,910✔
361
  rpcInit.connType = TAOS_CONN_CLIENT;
2,910✔
362
  rpcInit.user = (char *)user;
2,910✔
363
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,910✔
364
  rpcInit.compressSize = tsCompressMsgSize;
2,910✔
365
  rpcInit.dfp = destroyAhandle;
2,910✔
366

367
  rpcInit.retryMinInterval = tsRedirectPeriod;
2,910✔
368
  rpcInit.retryStepFactor = tsRedirectFactor;
2,910✔
369
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
2,910✔
370
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
2,910✔
371

372
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,910✔
373
  connLimitNum = TMAX(connLimitNum, 10);
2,910✔
374
  connLimitNum = TMIN(connLimitNum, 1000);
2,910✔
375
  rpcInit.connLimitNum = connLimitNum;
2,910✔
376
  rpcInit.shareConnLimit = tsShareConnLimit;
2,910✔
377
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,910✔
378
  rpcInit.startReadTimer = 1;
2,910✔
379
  rpcInit.readTimeout = tsReadTimeout;
2,910✔
380

381
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
2,910✔
382
  if (TSDB_CODE_SUCCESS != code) {
2,910!
383
    tscError("invalid version string.");
×
384
    return code;
×
385
  }
386

387
  *pDnodeConn = rpcOpen(&rpcInit);
2,910✔
388
  if (*pDnodeConn == NULL) {
2,910!
389
    tscError("failed to init connection to server since %s", tstrerror(terrno));
×
390
    code = terrno;
×
391
  }
392

393
  return code;
2,910✔
394
}
395

396
void destroyAllRequests(SHashObj *pRequests) {
10,327✔
397
  void *pIter = taosHashIterate(pRequests, NULL);
10,327✔
398
  while (pIter != NULL) {
10,327!
399
    int64_t *rid = pIter;
×
400

401
    SRequestObj *pRequest = acquireRequest(*rid);
×
402
    if (pRequest) {
×
403
      destroyRequest(pRequest);
×
404
      (void)releaseRequest(*rid);  // ignore error
×
405
    }
406

407
    pIter = taosHashIterate(pRequests, pIter);
×
408
  }
409
}
10,327✔
410

411
void stopAllRequests(SHashObj *pRequests) {
5✔
412
  void *pIter = taosHashIterate(pRequests, NULL);
5✔
413
  while (pIter != NULL) {
5!
414
    int64_t *rid = pIter;
×
415

416
    SRequestObj *pRequest = acquireRequest(*rid);
×
417
    if (pRequest) {
×
418
      taos_stop_query(pRequest);
×
419
      (void)releaseRequest(*rid);  // ignore error
×
420
    }
421

422
    pIter = taosHashIterate(pRequests, pIter);
×
423
  }
424
}
5✔
425

426
void destroyAppInst(void *info) {
×
427
  SAppInstInfo *pAppInfo = *(SAppInstInfo **)info;
×
428
  tscDebug("destroy app inst mgr %p", pAppInfo);
×
429

430
  int32_t code = taosThreadMutexLock(&appInfo.mutex);
×
431
  if (TSDB_CODE_SUCCESS != code) {
×
432
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
433
  }
434

435
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
×
436

437
  code = taosThreadMutexUnlock(&appInfo.mutex);
×
438
  if (TSDB_CODE_SUCCESS != code) {
×
439
    tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
440
  }
441

442
  taosMemoryFreeClear(pAppInfo->instKey);
×
443
  closeTransporter(pAppInfo);
×
444

445
  code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
×
446
  if (TSDB_CODE_SUCCESS != code) {
×
447
    tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
448
  }
449

450
  taosArrayDestroy(pAppInfo->pQnodeList);
×
451
  code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
×
452
  if (TSDB_CODE_SUCCESS != code) {
×
453
    tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
454
  }
455

456
  taosMemoryFree(pAppInfo);
×
457
}
×
458

459
void destroyTscObj(void *pObj) {
10,327✔
460
  if (NULL == pObj) {
10,327!
461
    return;
×
462
  }
463

464
  STscObj *pTscObj = pObj;
10,327✔
465
  int64_t  tscId = pTscObj->id;
10,327✔
466
  tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
10,327✔
467

468
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
10,327✔
469
  hbDeregisterConn(pTscObj, connKey);
10,327✔
470

471
  destroyAllRequests(pTscObj->pRequests);
10,327✔
472
  taosHashCleanup(pTscObj->pRequests);
10,327✔
473

474
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
10,327✔
475
  tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
10,327✔
476
           pTscObj->pAppInfo->numOfConns);
477

478
  // In any cases, we should not free app inst here. Or an race condition rises.
479
  /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
10,327✔
480

481
  (void)taosThreadMutexDestroy(&pTscObj->mutex);
10,327✔
482
  taosMemoryFree(pTscObj);
10,327✔
483

484
  tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
10,327✔
485
}
486

487
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
10,994✔
488
                     STscObj **pObj) {
489
  *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
10,994✔
490
  if (NULL == *pObj) {
10,994!
491
    return terrno;
×
492
  }
493

494
  (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
10,994✔
495
  if (NULL == (*pObj)->pRequests) {
10,994!
496
    taosMemoryFree(*pObj);
×
497
    return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
498
  }
499

500
  (*pObj)->connType = connType;
10,994✔
501
  (*pObj)->pAppInfo = pAppInfo;
10,994✔
502
  (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
10,994✔
503
  tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
10,994✔
504
  (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
10,994✔
505

506
  if (db != NULL) {
10,994!
507
    tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
10,994✔
508
  }
509

510
  TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
10,994!
511

512
  int32_t code = TSDB_CODE_SUCCESS;
10,994✔
513

514
  (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
10,994✔
515
  if ((*pObj)->id < 0) {
10,994!
516
    tscError("failed to add object to clientConnRefPool");
×
517
    code = terrno;
×
518
    taosMemoryFree(*pObj);
×
519
    return code;
×
520
  }
521

522
  (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
10,994✔
523

524
  tscDebug("connObj created, 0x%" PRIx64 ",p:%p", (*pObj)->id, *pObj);
10,994✔
525
  return code;
10,994✔
526
}
527

528
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
19,433,593✔
529

530
void releaseTscObj(int64_t rid) {
19,463,578✔
531
  int32_t code = taosReleaseRef(clientConnRefPool, rid);
19,463,578✔
532
  if (TSDB_CODE_SUCCESS != code) {
19,463,835!
533
    tscWarn("failed to release TscObj, code:%s", tstrerror(code));
×
534
  }
535
}
19,463,835✔
536

537
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
10,603,181✔
538
  int32_t code = TSDB_CODE_SUCCESS;
10,603,181✔
539
  *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
10,603,181✔
540
  if (NULL == *pRequest) {
10,605,754!
541
    return terrno;
×
542
  }
543

544
  STscObj *pTscObj = acquireTscObj(connId);
10,605,754✔
545
  if (pTscObj == NULL) {
10,608,067!
546
    TSC_ERR_JRET(TSDB_CODE_TSC_DISCONNECTED);
×
547
  }
548
  SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
10,608,067✔
549
  if (interParam == NULL) {
10,606,619!
550
    releaseTscObj(connId);
×
551
    TSC_ERR_JRET(terrno);
×
552
  }
553
  TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
10,606,619!
554
  interParam->pRequest = *pRequest;
10,605,523✔
555
  (*pRequest)->body.interParam = interParam;
10,605,523✔
556

557
  (*pRequest)->resType = RES_TYPE__QUERY;
10,605,523✔
558
  (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
10,605,523!
559
  (*pRequest)->metric.start = taosGetTimestampUs();
10,612,936✔
560

561
  (*pRequest)->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
10,609,322✔
562
  (*pRequest)->type = type;
10,609,322✔
563
  (*pRequest)->allocatorRefId = -1;
10,609,322✔
564

565
  (*pRequest)->pDb = getDbOfConnection(pTscObj);
10,609,322✔
566
  if (NULL == (*pRequest)->pDb) {
10,610,522✔
567
    TSC_ERR_JRET(terrno);
217,768!
568
  }
569
  (*pRequest)->pTscObj = pTscObj;
10,610,521✔
570
  (*pRequest)->inCallback = false;
10,610,521✔
571
  (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
10,610,521✔
572
  if (NULL == (*pRequest)->msgBuf) {
10,604,237!
573
    code = terrno;
×
574
    goto _return;
×
575
  }
576
  (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
10,604,237✔
577
  TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
10,604,237!
578
  TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
10,604,101✔
579

580
  return TSDB_CODE_SUCCESS;
10,611,131✔
581
_return:
×
582
  if ((*pRequest)->pTscObj) {
×
583
    doDestroyRequest(*pRequest);
×
584
  } else {
585
    taosMemoryFree(*pRequest);
×
586
  }
587
  return code;
×
588
}
589

590
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
10,971,913✔
591
  taosMemoryFreeClear(pResInfo->pRspMsg);
10,971,913✔
592
  taosMemoryFreeClear(pResInfo->length);
10,971,913✔
593
  taosMemoryFreeClear(pResInfo->row);
10,971,926✔
594
  taosMemoryFreeClear(pResInfo->pCol);
10,971,925✔
595
  taosMemoryFreeClear(pResInfo->fields);
10,971,928✔
596
  taosMemoryFreeClear(pResInfo->userFields);
10,971,927✔
597
  taosMemoryFreeClear(pResInfo->convertJson);
10,971,922✔
598
  taosMemoryFreeClear(pResInfo->decompBuf);
10,971,922✔
599

600
  if (pResInfo->convertBuf != NULL) {
10,971,922✔
601
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
4,398,916✔
602
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
3,348,550✔
603
    }
604
    taosMemoryFreeClear(pResInfo->convertBuf);
1,050,366✔
605
  }
606
}
10,971,926✔
607

608
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
45,831,092✔
609

610
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
46,375,940✔
611

612
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
10,614,748✔
613

614
/// return the most previous req ref id
615
int64_t removeFromMostPrevReq(SRequestObj *pRequest) {
10,614,358✔
616
  int64_t      mostPrevReqRefId = pRequest->self;
10,614,358✔
617
  SRequestObj *pTmp = pRequest;
10,614,358✔
618
  while (pTmp->relation.prevRefId) {
10,614,850✔
619
    pTmp = acquireRequest(pTmp->relation.prevRefId);
492✔
620
    if (pTmp) {
492!
621
      mostPrevReqRefId = pTmp->self;
492✔
622
      (void)releaseRequest(mostPrevReqRefId);  // ignore error
492✔
623
    } else {
624
      break;
×
625
    }
626
  }
627
  (void)removeRequest(mostPrevReqRefId);  // ignore error
10,614,358✔
628
  return mostPrevReqRefId;
10,609,298✔
629
}
630

631
void destroyNextReq(int64_t nextRefId) {
10,608,550✔
632
  if (nextRefId) {
10,608,550✔
633
    SRequestObj *pObj = acquireRequest(nextRefId);
516✔
634
    if (pObj) {
516!
635
      (void)releaseRequest(nextRefId);  // ignore error
516✔
636
      (void)releaseRequest(nextRefId);  // ignore error
516✔
637
    }
638
  }
639
}
10,608,550✔
640

641
void destroySubRequests(SRequestObj *pRequest) {
×
642
  int32_t      reqIdx = -1;
×
643
  SRequestObj *pReqList[16] = {NULL};
×
644
  uint64_t     tmpRefId = 0;
×
645

646
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
×
647
    return;
×
648
  }
649

650
  SRequestObj *pTmp = pRequest;
×
651
  while (pTmp->relation.prevRefId) {
×
652
    tmpRefId = pTmp->relation.prevRefId;
×
653
    pTmp = acquireRequest(tmpRefId);
×
654
    if (pTmp) {
×
655
      pReqList[++reqIdx] = pTmp;
×
656
      (void)releaseRequest(tmpRefId);  // ignore error
×
657
    } else {
658
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
659
      break;
×
660
    }
661
  }
662

663
  for (int32_t i = reqIdx; i >= 0; i--) {
×
664
    (void)removeRequest(pReqList[i]->self);  // ignore error
×
665
  }
666

667
  tmpRefId = pRequest->relation.nextRefId;
×
668
  while (tmpRefId) {
×
669
    pTmp = acquireRequest(tmpRefId);
×
670
    if (pTmp) {
×
671
      tmpRefId = pTmp->relation.nextRefId;
×
672
      (void)removeRequest(pTmp->self);   // ignore error
×
673
      (void)releaseRequest(pTmp->self);  // ignore error
×
674
    } else {
675
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
676
      break;
×
677
    }
678
  }
679
}
680

681
void doDestroyRequest(void *p) {
10,603,737✔
682
  if (NULL == p) {
10,603,737!
683
    return;
×
684
  }
685

686
  SRequestObj *pRequest = (SRequestObj *)p;
10,603,737✔
687

688
  uint64_t reqId = pRequest->requestId;
10,603,737✔
689
  tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
10,603,737✔
690

691
  int64_t nextReqRefId = pRequest->relation.nextRefId;
10,603,737✔
692

693
  int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
10,603,737✔
694
  if (TSDB_CODE_SUCCESS != code) {
10,593,040✔
695
    tscWarn("failed to remove request from hash, code:%s", tstrerror(code));
10,994!
696
  }
697
  schedulerFreeJob(&pRequest->body.queryJob, 0);
10,593,040✔
698

699
  destorySqlCallbackWrapper(pRequest->pWrapper);
10,615,038✔
700

701
  taosMemoryFreeClear(pRequest->msgBuf);
10,614,476!
702

703
  doFreeReqResultInfo(&pRequest->body.resInfo);
10,610,343✔
704
  if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
10,608,576!
705
    tscError("failed to destroy semaphore");
×
706
  }
707

708
  taosArrayDestroy(pRequest->tableList);
10,605,598✔
709
  taosArrayDestroy(pRequest->targetTableList);
10,604,883✔
710
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
10,603,875✔
711

712
  if (pRequest->self) {
10,605,716!
713
    deregisterRequest(pRequest);
10,605,947✔
714
  }
715

716
  taosMemoryFreeClear(pRequest->pDb);
10,616,182✔
717
  taosArrayDestroy(pRequest->dbList);
10,616,115✔
718
  if (pRequest->body.interParam) {
10,616,490!
719
    if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
10,616,493!
720
      tscError("failed to destroy semaphore in pRequest");
×
721
    }
722
  }
723
  taosMemoryFree(pRequest->body.interParam);
10,616,484✔
724

725
  qDestroyQuery(pRequest->pQuery);
10,616,497✔
726
  nodesDestroyAllocator(pRequest->allocatorRefId);
10,609,431✔
727

728
  taosMemoryFreeClear(pRequest->effectiveUser);
10,607,316✔
729
  taosMemoryFreeClear(pRequest->sqlstr);
10,607,316!
730
  taosMemoryFree(pRequest);
10,608,277✔
731
  tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
10,610,049✔
732
  destroyNextReq(nextReqRefId);
10,610,049✔
733
}
734

735
void destroyRequest(SRequestObj *pRequest) {
10,598,917✔
736
  if (pRequest == NULL) {
10,598,917!
737
    return;
×
738
  }
739

740
  taos_stop_query(pRequest);
10,598,917✔
741
  (void)removeFromMostPrevReq(pRequest);
10,614,887✔
742
}
743

744
void taosStopQueryImpl(SRequestObj *pRequest) {
10,600,701✔
745
  pRequest->killed = true;
10,600,701✔
746

747
  // It is not a query, no need to stop.
748
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
10,600,701✔
749
    tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
224,550✔
750
    return;
234,556✔
751
  }
752

753
  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
10,376,151✔
754
  tscDebug("request %" PRIx64 " killed", pRequest->requestId);
10,381,162✔
755
}
756

757
void stopAllQueries(SRequestObj *pRequest) {
10,598,024✔
758
  int32_t      reqIdx = -1;
10,598,024✔
759
  SRequestObj *pReqList[16] = {NULL};
10,598,024✔
760
  uint64_t     tmpRefId = 0;
10,598,024✔
761

762
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
10,598,024!
763
    return;
×
764
  }
765

766
  SRequestObj *pTmp = pRequest;
10,598,024✔
767
  while (pTmp->relation.prevRefId) {
10,598,516✔
768
    tmpRefId = pTmp->relation.prevRefId;
492✔
769
    pTmp = acquireRequest(tmpRefId);
492✔
770
    if (pTmp) {
492!
771
      pReqList[++reqIdx] = pTmp;
492✔
772
    } else {
773
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
774
      break;
×
775
    }
776
  }
777

778
  for (int32_t i = reqIdx; i >= 0; i--) {
10,603,453✔
779
    taosStopQueryImpl(pReqList[i]);
492✔
780
    (void)releaseRequest(pReqList[i]->self);  // ignore error
492✔
781
  }
782

783
  taosStopQueryImpl(pRequest);
10,602,961✔
784

785
  tmpRefId = pRequest->relation.nextRefId;
10,615,235✔
786
  while (tmpRefId) {
10,615,097!
787
    pTmp = acquireRequest(tmpRefId);
×
788
    if (pTmp) {
×
789
      tmpRefId = pTmp->relation.nextRefId;
×
790
      taosStopQueryImpl(pTmp);
×
791
      (void)releaseRequest(pTmp->self);  // ignore error
×
792
    } else {
793
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
794
      break;
×
795
    }
796
  }
797
}
798

799
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
×
800

801
static void *tscCrashReportThreadFp(void *param) {
×
802
  setThreadName("client-crashReport");
×
803
  char filepath[PATH_MAX] = {0};
×
804
  (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
×
805
  char     *pMsg = NULL;
×
806
  int64_t   msgLen = 0;
×
807
  TdFilePtr pFile = NULL;
×
808
  bool      truncateFile = false;
×
809
  int32_t   sleepTime = 200;
×
810
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
811
  int32_t   loopTimes = reportPeriodNum;
×
812

813
#ifdef WINDOWS
814
  if (taosCheckCurrentInDll()) {
815
    atexit(crashReportThreadFuncUnexpectedStopped);
816
  }
817
#endif
818

819
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
×
820
    return NULL;
×
821
  }
822

823
  while (1) {
824
    if (clientStop > 0) break;
×
825
    if (loopTimes++ < reportPeriodNum) {
×
826
      taosMsleep(sleepTime);
×
827
      continue;
×
828
    }
829

830
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
831
    if (pMsg && msgLen > 0) {
×
832
      if (taosSendHttpReport(tsTelemServer, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
833
        tscError("failed to send crash report");
×
834
        if (pFile) {
×
835
          taosReleaseCrashLogFile(pFile, false);
×
836
          pFile = NULL;
×
837

838
          taosMsleep(sleepTime);
×
839
          loopTimes = 0;
×
840
          continue;
×
841
        }
842
      } else {
843
        tscInfo("succeed to send crash report");
×
844
        truncateFile = true;
×
845
      }
846
    } else {
847
      tscDebug("no crash info");
×
848
    }
849

850
    taosMemoryFree(pMsg);
×
851

852
    if (pMsg && msgLen > 0) {
×
853
      pMsg = NULL;
×
854
      continue;
×
855
    }
856

857
    if (pFile) {
×
858
      taosReleaseCrashLogFile(pFile, truncateFile);
×
859
      pFile = NULL;
×
860
      truncateFile = false;
×
861
    }
862

863
    taosMsleep(sleepTime);
×
864
    loopTimes = 0;
×
865
  }
866

867
  clientStop = -2;
×
868
  return NULL;
×
869
}
870

871
int32_t tscCrashReportInit() {
2,693✔
872
  if (!tsEnableCrashReport) {
2,693!
873
    return TSDB_CODE_SUCCESS;
2,693✔
874
  }
875
  int32_t      code = TSDB_CODE_SUCCESS;
×
876
  TdThreadAttr thAttr;
877
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
×
878
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
×
879
  TdThread crashReportThread;
880
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
×
881
    tscError("failed to create crashReport thread since %s", strerror(errno));
×
882
    terrno = TAOS_SYSTEM_ERROR(errno);
×
883
    TSC_ERR_RET(errno);
×
884
  }
885

886
  (void)taosThreadAttrDestroy(&thAttr);
×
887
_return:
×
888
  if (code) {
×
889
    terrno = TAOS_SYSTEM_ERROR(errno);
×
890
    TSC_ERR_RET(terrno);
×
891
  }
892

893
  return code;
×
894
}
895

896
void tscStopCrashReport() {
2,693✔
897
  if (!tsEnableCrashReport) {
2,693!
898
    return;
2,693✔
899
  }
900

901
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
×
902
    tscDebug("crash report thread already stopped");
×
903
    return;
×
904
  }
905

906
  while (atomic_load_32(&clientStop) > 0) {
×
907
    taosMsleep(100);
×
908
  }
909
}
910

911
void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
×
912
  char       *pMsg = NULL;
×
913
  const char *flags = "UTL FATAL ";
×
914
  ELogLevel   level = DEBUG_FATAL;
×
915
  int32_t     dflag = 255;
×
916
  int64_t     msgLen = -1;
×
917

918
  if (tsEnableCrashReport) {
×
919
    if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
×
920
      taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
×
921
    } else {
922
      msgLen = strlen(pMsg);
×
923
    }
924
  }
925

926
  taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
×
927
}
×
928

929
void taos_init_imp(void) {
2,693✔
930
#if defined(LINUX)
931
  if (tscDbg.memEnable) {
2,693!
932
    int32_t code = taosMemoryDbgInit();
×
933
    if (code) {
×
934
      (void)printf("failed to init memory dbg, error:%s\n", tstrerror(code));
×
935
    } else {
936
      tsAsyncLog = false;
×
937
      (void)printf("memory dbg enabled\n");
×
938
    }
939
  }
940
#endif
941

942
  // In the APIs of other program language, taos_cleanup is not available yet.
943
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
944
  (void)atexit(taos_cleanup);
2,693✔
945
  errno = TSDB_CODE_SUCCESS;
2,693✔
946
  taosSeedRand(taosGetTimestampSec());
2,693✔
947

948
  appInfo.pid = taosGetPId();
2,693✔
949
  appInfo.startTime = taosGetTimestampMs();
2,693✔
950
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
2,693✔
951
  appInfo.pInstMapByClusterId =
2,693✔
952
      taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
2,693✔
953
  if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
2,693!
954
    (void)printf("failed to allocate memory when init appInfo\n");
×
955
    tscInitRes = TSDB_CODE_OUT_OF_MEMORY;
×
956
    return;
×
957
  }
958
  taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
2,693✔
959
  deltaToUtcInitOnce();
2,693✔
960

961
  const char *logName = CUS_PROMPT "slog";
2,693✔
962
  ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
2,693!
963
  if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
2,693!
964
    (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(errno), configDir);
×
965
    tscInitRes = -1;
×
966
    return;
×
967
  }
968

969
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
2,693!
970

971
  initQueryModuleMsgHandle();
2,693✔
972
  ENV_ERR_RET(taosConvInit(), "failed to init conv");
2,693!
973
  ENV_ERR_RET(monitorInit(), "failed to init monitor");
2,693!
974
  ENV_ERR_RET(rpcInit(), "failed to init rpc");
2,693!
975

976
  if (InitRegexCache() != 0) {
2,693!
977
    tscInitRes = -1;
×
978
    (void)printf("failed to init regex cache\n");
×
979
    return;
×
980
  }
981

982
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
2,693✔
983
  ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
2,693!
984
  ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
2,693!
985
  ENV_ERR_RET(initClientId(), "failed to init clientId");
2,693!
986

987
  tscDebug("starting to initialize TAOS driver");
2,693✔
988

989
  ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
2,693!
990
  ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
2,693!
991
  ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
2,693!
992

993
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
2,693✔
994
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
2,693✔
995

996
  ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
2,693!
997
  ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
2,693!
998
  ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
2,693!
999
  ENV_ERR_RET(qInitKeywordsTable(), "failed to init parser keywords table");
2,693!
1000

1001
  tscDebug("client is initialized successfully");
2,693✔
1002
}
1003

1004
int taos_init() {
12,044✔
1005
  (void)taosThreadOnce(&tscinit, taos_init_imp);
12,044✔
1006
  return tscInitRes;
12,045✔
1007
}
1008

1009
const char *getCfgName(TSDB_OPTION option) {
10✔
1010
  const char *name = NULL;
10✔
1011

1012
  switch (option) {
10!
1013
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
×
1014
      name = "shellActivityTimer";
×
1015
      break;
×
1016
    case TSDB_OPTION_LOCALE:
×
1017
      name = "locale";
×
1018
      break;
×
1019
    case TSDB_OPTION_CHARSET:
×
1020
      name = "charset";
×
1021
      break;
×
1022
    case TSDB_OPTION_TIMEZONE:
10✔
1023
      name = "timezone";
10✔
1024
      break;
10✔
1025
    case TSDB_OPTION_USE_ADAPTER:
×
1026
      name = "useAdapter";
×
1027
      break;
×
1028
    default:
×
1029
      break;
×
1030
  }
1031

1032
  return name;
10✔
1033
}
1034

1035
int taos_options_imp(TSDB_OPTION option, const char *str) {
1,749✔
1036
  if (option == TSDB_OPTION_CONFIGDIR) {
1,749✔
1037
#ifndef WINDOWS
1038
    char newstr[PATH_MAX];
1039
    int  len = strlen(str);
1,739✔
1040
    if (len > 1 && str[0] != '"' && str[0] != '\'') {
1,739!
1041
      if (len + 2 >= PATH_MAX) {
1,734!
1042
        tscError("Too long path %s", str);
×
1043
        return -1;
×
1044
      }
1045
      newstr[0] = '"';
1,734✔
1046
      (void)memcpy(newstr + 1, str, len);
1,734✔
1047
      newstr[len + 1] = '"';
1,734✔
1048
      newstr[len + 2] = '\0';
1,734✔
1049
      str = newstr;
1,734✔
1050
    }
1051
#endif
1052
    tstrncpy(configDir, str, PATH_MAX);
1,739✔
1053
    tscInfo("set cfg:%s to %s", configDir, str);
1,739!
1054
    return 0;
1,739✔
1055
  }
1056

1057
  // initialize global config
1058
  if (taos_init() != 0) {
10!
1059
    return -1;
×
1060
  }
1061

1062
  SConfig     *pCfg = taosGetCfg();
10✔
1063
  SConfigItem *pItem = NULL;
10✔
1064
  const char  *name = getCfgName(option);
10✔
1065

1066
  if (name == NULL) {
10!
1067
    tscError("Invalid option %d", option);
×
1068
    return -1;
×
1069
  }
1070

1071
  pItem = cfgGetItem(pCfg, name);
10✔
1072
  if (pItem == NULL) {
10!
1073
    tscError("Invalid option %d", option);
×
1074
    return -1;
×
1075
  }
1076

1077
  int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
10✔
1078
  if (code != 0) {
10!
1079
    tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
×
1080
  } else {
1081
    tscInfo("set cfg:%s to %s", name, str);
10!
1082
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
10!
1083
      code = taosCfgDynamicOptions(pCfg, name, false);
×
1084
    }
1085
  }
1086

1087
  return code;
10✔
1088
}
1089

1090
/**
1091
 * The request id is an unsigned integer format of 64bit.
1092
 *+------------+-----+-----------+---------------+
1093
 *| uid|localIp| PId | timestamp | serial number |
1094
 *+------------+-----+-----------+---------------+
1095
 *| 12bit      |12bit|24bit      |16bit          |
1096
 *+------------+-----+-----------+---------------+
1097
 * @return
1098
 */
1099
uint64_t generateRequestId() {
10,702,326✔
1100
  static uint32_t hashId = 0;
1101
  static int32_t requestSerialId = 0;
1102

1103
  if (hashId == 0) {
10,702,326✔
1104
    int32_t code = taosGetSystemUUIDU32(&hashId);
2,686✔
1105
    if (code != TSDB_CODE_SUCCESS) {
2,686!
1106
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
×
1107
               tstrerror(code));
1108
    }
1109
  }
1110

1111
  uint64_t id = 0;
10,702,891✔
1112

1113
  while (true) {
×
1114
    int64_t  ts = taosGetTimestampMs();
10,702,277✔
1115
    uint64_t pid = taosGetPId();
10,702,277✔
1116
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
10,694,219✔
1117
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
10,710,160✔
1118

1119
    id = (((uint64_t)(hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
10,712,446✔
1120
    if (id) {
10,712,446!
1121
      break;
10,712,446✔
1122
    }
1123
  }
1124
  return id;
10,712,446✔
1125
}
1126

1127
#if 0
1128
#include "cJSON.h"
1129
static setConfRet taos_set_config_imp(const char *config){
1130
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
1131
  static bool setConfFlag = false;
1132
  if (setConfFlag) {
1133
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
1134
    tstrncpy(ret.retMsg, "configuration can only set once", RET_MSG_LENGTH);
1135
    return ret;
1136
  }
1137
  taosInitGlobalCfg();
1138
  cJSON *root = cJSON_Parse(config);
1139
  if (root == NULL){
1140
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
1141
    tstrncpy(ret.retMsg, "parse json error", RET_MSG_LENGTH);
1142
    return ret;
1143
  }
1144

1145
  int size = cJSON_GetArraySize(root);
1146
  if(!cJSON_IsObject(root) || size == 0) {
1147
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
1148
    tstrncpy(ret.retMsg, "json content is invalid, must be not empty object", RET_MSG_LENGTH);
1149
    return ret;
1150
  }
1151

1152
  if(size >= 1000) {
1153
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
1154
    tstrncpy(ret.retMsg, "json object size is too long", RET_MSG_LENGTH);
1155
    return ret;
1156
  }
1157

1158
  for(int i = 0; i < size; i++){
1159
    cJSON *item = cJSON_GetArrayItem(root, i);
1160
    if(!item) {
1161
      ret.retCode = SET_CONF_RET_ERR_INNER;
1162
      tstrncpy(ret.retMsg, "inner error", RET_MSG_LENGTH);
1163
      return ret;
1164
    }
1165
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
1166
      ret.retCode = SET_CONF_RET_ERR_PART;
1167
      if (strlen(ret.retMsg) == 0){
1168
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
1169
      }else{
1170
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1171
        size_t leftSize = tmp >= 0 ? tmp : 0;
1172
        strncat(ret.retMsg, "|",  leftSize);
1173
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1174
        leftSize = tmp >= 0 ? tmp : 0;
1175
        strncat(ret.retMsg, item->string, leftSize);
1176
      }
1177
    }
1178
  }
1179
  cJSON_Delete(root);
1180
  setConfFlag = true;
1181
  return ret;
1182
}
1183

1184
setConfRet taos_set_config(const char *config){
1185
  taosThreadMutexLock(&setConfMutex);
1186
  setConfRet ret = taos_set_config_imp(config);
1187
  taosThreadMutexUnlock(&setConfMutex);
1188
  return ret;
1189
}
1190
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc