• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3646

12 Mar 2025 12:34PM UTC coverage: 28.375% (-27.8%) from 56.156%
#3646

push

travis-ci

web-flow
Merge pull request #30119 from taosdata/ciup30

ci: Update workflow to fix param issue of run_tdgpt_test

59085 of 286935 branches covered (20.59%)

Branch coverage included in aggregate %.

102775 of 283490 relevant lines covered (36.25%)

55149.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.87
/source/client/src/clientEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ttimer.h>
17
#include "cJSON.h"
18
#include "catalog.h"
19
#include "clientInt.h"
20
#include "clientLog.h"
21
#include "clientMonitor.h"
22
#include "functionMgt.h"
23
#include "os.h"
24
#include "osSleep.h"
25
#include "query.h"
26
#include "qworker.h"
27
#include "scheduler.h"
28
#include "tcache.h"
29
#include "tcompare.h"
30
#include "tconv.h"
31
#include "tglobal.h"
32
#include "thttp.h"
33
#include "tmsg.h"
34
#include "tqueue.h"
35
#include "tref.h"
36
#include "trpc.h"
37
#include "tsched.h"
38
#include "ttime.h"
39
#include "tversion.h"
40

41
#include "cus_name.h"
42

43
#define TSC_VAR_NOT_RELEASE 1
44
#define TSC_VAR_RELEASED    0
45

46
#define ENV_JSON_FALSE_CHECK(c)                     \
47
  do {                                              \
48
    if (!c) {                                       \
49
      tscError("faild to add item to JSON object"); \
50
      code = TSDB_CODE_TSC_FAIL_GENERATE_JSON;      \
51
      goto _end;                                    \
52
    }                                               \
53
  } while (0)
54

55
#define ENV_ERR_RET(c, info)          \
56
  do {                                \
57
    int32_t _code = c;                \
58
    if (_code != TSDB_CODE_SUCCESS) { \
59
      errno = _code;                  \
60
      tscInitRes = _code;             \
61
      tscError(info);                 \
62
      return;                         \
63
    }                                 \
64
  } while (0)
65

66
STscDbg   tscDbg = {0};
67
SAppInfo  appInfo;
68
int64_t   lastClusterId = 0;
69
int32_t   clientReqRefPool = -1;
70
int32_t   clientConnRefPool = -1;
71
int32_t   clientStop = -1;
72
SHashObj *pTimezoneMap = NULL;
73

74
int32_t timestampDeltaLimit = 900;  // s
75

76
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
77
volatile int32_t    tscInitRes = 0;
78

79
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
167,608✔
80
  int32_t code = TSDB_CODE_SUCCESS;
167,608✔
81
  // connection has been released already, abort creating request.
82
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
167,608✔
83
  if (pRequest->self < 0) {
167,608!
84
    tscError("failed to add ref to request");
×
85
    code = terrno;
×
86
    return code;
×
87
  }
88

89
  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
167,608✔
90

91
  if (pTscObj->pAppInfo) {
167,608!
92
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
167,608✔
93

94
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
167,608✔
95
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
167,608✔
96
    tscDebug("req:0x%" PRIx64 ", new from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, QID:0x%" PRIx64,
167,608✔
97
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
98
  }
99

100
  return code;
167,608✔
101
}
102

103
static void concatStrings(SArray *list, char *buf, int size) {
8✔
104
  int len = 0;
8✔
105
  for (int i = 0; i < taosArrayGetSize(list); i++) {
24✔
106
    char *db = taosArrayGet(list, i);
16✔
107
    if (NULL == db) {
16!
108
      tscError("get dbname failed, buf:%s", buf);
×
109
      break;
×
110
    }
111
    char *dot = strchr(db, '.');
16✔
112
    if (dot != NULL) {
16!
113
      db = dot + 1;
16✔
114
    }
115
    if (i != 0) {
16✔
116
      (void)strncat(buf, ",", size - 1 - len);
8✔
117
      len += 1;
8✔
118
    }
119
    int ret = tsnprintf(buf + len, size - len, "%s", db);
16✔
120
    if (ret < 0) {
16!
121
      tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
×
122
      break;
×
123
    }
124
    len += ret;
16✔
125
    if (len >= size) {
16!
126
      tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
×
127
      break;
×
128
    }
129
  }
130
}
8✔
131

132
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration) {
8✔
133
  cJSON  *json = cJSON_CreateObject();
8✔
134
  int32_t code = TSDB_CODE_SUCCESS;
8✔
135
  if (json == NULL) {
8!
136
    tscError("failed to create monitor json");
×
137
    return TSDB_CODE_OUT_OF_MEMORY;
×
138
  }
139
  char clusterId[32] = {0};
8✔
140
  if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0) {
8!
141
    tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
×
142
    code = TSDB_CODE_FAILED;
×
143
    goto _end;
×
144
  }
145

146
  char startTs[32] = {0};
8✔
147
  if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start / 1000) < 0) {
8!
148
    tscError("failed to generate startTs:%" PRId64, pRequest->metric.start / 1000);
×
149
    code = TSDB_CODE_FAILED;
×
150
    goto _end;
×
151
  }
152

153
  char requestId[32] = {0};
8✔
154
  if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0) {
8!
155
    tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
×
156
    code = TSDB_CODE_FAILED;
×
157
    goto _end;
×
158
  }
159
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
8!
160
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
8!
161
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
8!
162
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration / 1000)));
8!
163
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
8!
164
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
8!
165
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
8!
166
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
8!
167
      json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
168
  if (pRequest->sqlstr != NULL &&
8!
169
      strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
8!
170
    char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
×
171
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
×
172
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
173
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
×
174
  } else {
175
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
8!
176
  }
177

178
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
8!
179
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
8!
180
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
8!
181

182
  char pid[32] = {0};
8✔
183
  if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0) {
8!
184
    tscError("failed to generate pid:%d", appInfo.pid);
×
185
    code = TSDB_CODE_FAILED;
×
186
    goto _end;
×
187
  }
188

189
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
8!
190
  if (pRequest->dbList != NULL) {
8!
191
    char dbList[1024] = {0};
8✔
192
    concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
8✔
193
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
8!
194
  } else if (pRequest->pDb != NULL) {
×
195
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
×
196
  } else {
197
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
×
198
  }
199

200
  char *value = cJSON_PrintUnformatted(json);
8✔
201
  if (value == NULL) {
8!
202
    tscError("failed to print json");
×
203
    code = TSDB_CODE_FAILED;
×
204
    goto _end;
×
205
  }
206
  MonitorSlowLogData data = {0};
8✔
207
  data.clusterId = pTscObj->pAppInfo->clusterId;
8✔
208
  data.type = SLOW_LOG_WRITE;
8✔
209
  data.data = value;
8✔
210
  code = monitorPutData2MonitorQueue(data);
8✔
211
  if (TSDB_CODE_SUCCESS != code) {
8!
212
    taosMemoryFree(value);
×
213
    goto _end;
×
214
  }
215

216
_end:
8✔
217
  cJSON_Delete(json);
8✔
218
  return code;
8✔
219
}
220

221
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char *exceptDb) {
26✔
222
  if (pRequest->pDb != NULL) {
26!
223
    return strcmp(pRequest->pDb, exceptDb) != 0;
26✔
224
  }
225

226
  for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
×
227
    char *db = taosArrayGet(pRequest->dbList, i);
×
228
    if (NULL == db) {
×
229
      tscError("get dbname failed, exceptDb:%s", exceptDb);
×
230
      return false;
×
231
    }
232
    char *dot = strchr(db, '.');
×
233
    if (dot != NULL) {
×
234
      db = dot + 1;
×
235
    }
236
    if (strcmp(db, exceptDb) == 0) {
×
237
      return false;
×
238
    }
239
  }
240
  return true;
×
241
}
242

243
static void deregisterRequest(SRequestObj *pRequest) {
167,590✔
244
  if (pRequest == NULL) {
167,590!
245
    tscError("pRequest == NULL");
×
246
    return;
×
247
  }
248

249
  STscObj            *pTscObj = pRequest->pTscObj;
167,590✔
250
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
167,590✔
251

252
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
167,590✔
253
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
167,590✔
254
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
167,590✔
255

256
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
167,590✔
257
  tscDebug("req:0x%" PRIx64 ", free from connObj:0x%" PRIx64 ", QID:0x%" PRIx64
167,590✔
258
           ", elapsed:%.2f ms, current:%d, app current:%d",
259
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
260

261
  if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
167,590!
262
    if ((pRequest->pQuery && pRequest->pQuery->pRoot && QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
167,590!
263
         (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
22,651✔
264
        QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
148,213✔
265
      tscDebug("req:0x%" PRIx64 ", insert duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
113,799✔
266
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
267
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
268
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs);
269
      (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
113,799✔
270
      reqType = SLOW_LOG_TYPE_INSERT;
113,799✔
271
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
53,791✔
272
      tscDebug("req:0x%" PRIx64 ", query duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
50,058✔
273
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
274
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
275
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs);
276

277
      (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
50,058✔
278
      reqType = SLOW_LOG_TYPE_QUERY;
50,058✔
279
    }
280

281
    if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
167,590!
282
      tscError("failed to release allocator");
×
283
    }
284
  }
285

286
  if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
167,590!
287
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
167,590!
288
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
113,799✔
289
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
53,791✔
290
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
50,058✔
291
    } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
3,733!
292
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
×
293
    }
294
  }
295

296
  if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
167,616!
297
      checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
26✔
298
    (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
26✔
299
    if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
26✔
300
      taosPrintSlowLog("PID:%d, connId:%u, QID:0x%" PRIx64 ", Start:%" PRId64 "us, Duration:%" PRId64 "us, SQL:%s",
8✔
301
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
302
                       pRequest->sqlstr);
303
      if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
8!
304
        slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
8✔
305
        if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
8!
306
          tscError("failed to generate write slow log");
×
307
        }
308
      }
309
    }
310
  }
311

312
  releaseTscObj(pTscObj->id);
167,590✔
313
}
314

315
// todo close the transporter properly
316
void closeTransporter(SAppInstInfo *pAppInfo) {
48✔
317
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
48!
318
    return;
×
319
  }
320

321
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
48✔
322
  rpcClose(pAppInfo->pTransporter);
48✔
323
}
324

325
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
1,024✔
326
  if (NEED_REDIRECT_ERROR(code)) {
1,024!
327
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
949!
328
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK ||
949!
329
        msgType == TDMT_SCH_TASK_NOTIFY) {
330
      return false;
×
331
    }
332
    return true;
949✔
333
  } else {
334
    return false;
75✔
335
  }
336
}
337

338
// start timer for particular msgType
339
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
×
340
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
×
341
    return true;
×
342
  }
343
  return false;
×
344
}
345

346
// TODO refactor
347
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
48✔
348
  SRpcInit rpcInit;
349
  (void)memset(&rpcInit, 0, sizeof(rpcInit));
48✔
350
  rpcInit.localPort = 0;
48✔
351
  rpcInit.label = "TSC";
48✔
352
  rpcInit.numOfThreads = tsNumOfRpcThreads;
48✔
353
  rpcInit.cfp = processMsgFromServer;
48✔
354
  rpcInit.rfp = clientRpcRfp;
48✔
355
  rpcInit.sessions = 1024;
48✔
356
  rpcInit.connType = TAOS_CONN_CLIENT;
48✔
357
  rpcInit.user = (char *)user;
48✔
358
  rpcInit.idleTime = tsShellActivityTimer * 1000;
48✔
359
  rpcInit.compressSize = tsCompressMsgSize;
48✔
360
  rpcInit.dfp = destroyAhandle;
48✔
361

362
  rpcInit.retryMinInterval = tsRedirectPeriod;
48✔
363
  rpcInit.retryStepFactor = tsRedirectFactor;
48✔
364
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
48✔
365
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
48✔
366

367
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
48✔
368
  connLimitNum = TMAX(connLimitNum, 10);
48✔
369
  connLimitNum = TMIN(connLimitNum, 1000);
48✔
370
  rpcInit.connLimitNum = connLimitNum;
48✔
371
  rpcInit.shareConnLimit = tsShareConnLimit;
48✔
372
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
48✔
373
  rpcInit.startReadTimer = 1;
48✔
374
  rpcInit.readTimeout = tsReadTimeout;
48✔
375

376
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
48✔
377
  if (TSDB_CODE_SUCCESS != code) {
48!
378
    tscError("invalid version string.");
×
379
    return code;
×
380
  }
381

382
  *pDnodeConn = rpcOpen(&rpcInit);
48✔
383
  if (*pDnodeConn == NULL) {
48!
384
    tscError("failed to init connection to server since %s", tstrerror(terrno));
×
385
    code = terrno;
×
386
  }
387

388
  return code;
48✔
389
}
390

391
void destroyAllRequests(SHashObj *pRequests) {
170✔
392
  void *pIter = taosHashIterate(pRequests, NULL);
170✔
393
  while (pIter != NULL) {
170!
394
    int64_t *rid = pIter;
×
395

396
    SRequestObj *pRequest = acquireRequest(*rid);
×
397
    if (pRequest) {
×
398
      destroyRequest(pRequest);
×
399
      (void)releaseRequest(*rid);  // ignore error
×
400
    }
401

402
    pIter = taosHashIterate(pRequests, pIter);
×
403
  }
404
}
170✔
405

406
void stopAllRequests(SHashObj *pRequests) {
×
407
  void *pIter = taosHashIterate(pRequests, NULL);
×
408
  while (pIter != NULL) {
×
409
    int64_t *rid = pIter;
×
410

411
    SRequestObj *pRequest = acquireRequest(*rid);
×
412
    if (pRequest) {
×
413
      taos_stop_query(pRequest);
×
414
      (void)releaseRequest(*rid);  // ignore error
×
415
    }
416

417
    pIter = taosHashIterate(pRequests, pIter);
×
418
  }
419
}
×
420

421
void destroyAppInst(void *info) {
48✔
422
  SAppInstInfo *pAppInfo = *(SAppInstInfo **)info;
48✔
423
  tscDebug("destroy app inst mgr %p", pAppInfo);
48✔
424

425
  int32_t code = taosThreadMutexLock(&appInfo.mutex);
48✔
426
  if (TSDB_CODE_SUCCESS != code) {
48!
427
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
428
  }
429

430
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
48✔
431

432
  code = taosThreadMutexUnlock(&appInfo.mutex);
48✔
433
  if (TSDB_CODE_SUCCESS != code) {
48!
434
    tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
435
  }
436

437
  taosMemoryFreeClear(pAppInfo->instKey);
48!
438
  closeTransporter(pAppInfo);
48✔
439

440
  code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
48✔
441
  if (TSDB_CODE_SUCCESS != code) {
48!
442
    tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
443
  }
444

445
  taosArrayDestroy(pAppInfo->pQnodeList);
48✔
446
  code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
48✔
447
  if (TSDB_CODE_SUCCESS != code) {
48!
448
    tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
449
  }
450

451
  taosMemoryFree(pAppInfo);
48!
452
}
48✔
453

454
void destroyTscObj(void *pObj) {
170✔
455
  if (NULL == pObj) {
170!
456
    return;
×
457
  }
458

459
  STscObj *pTscObj = pObj;
170✔
460
  int64_t  tscId = pTscObj->id;
170✔
461
  tscTrace("connObj:%" PRIx64 ", begin destroy, p:%p", tscId, pTscObj);
170!
462

463
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
170✔
464
  hbDeregisterConn(pTscObj, connKey);
170✔
465

466
  destroyAllRequests(pTscObj->pRequests);
170✔
467
  taosHashCleanup(pTscObj->pRequests);
170✔
468

469
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
170✔
470
  tscDebug("connObj:0x%" PRIx64 ", p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
170✔
471
           pTscObj->pAppInfo->numOfConns);
472

473
  // In any cases, we should not free app inst here. Or an race condition rises.
474
  /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
170✔
475

476
  (void)taosThreadMutexDestroy(&pTscObj->mutex);
170✔
477
  taosMemoryFree(pTscObj);
170!
478

479
  tscTrace("connObj:0x%" PRIx64 ", end destroy, p:%p", tscId, pTscObj);
170!
480
}
481

482
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
178✔
483
                     STscObj **pObj) {
484
  *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
178!
485
  if (NULL == *pObj) {
178!
486
    return terrno;
×
487
  }
488

489
  (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
178✔
490
  if (NULL == (*pObj)->pRequests) {
178!
491
    taosMemoryFree(*pObj);
×
492
    return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
493
  }
494

495
  (*pObj)->connType = connType;
178✔
496
  (*pObj)->pAppInfo = pAppInfo;
178✔
497
  (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
178✔
498
  tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
178✔
499
  (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
178✔
500

501
  if (db != NULL) {
178!
502
    tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
178✔
503
  }
504

505
  TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
178!
506

507
  int32_t code = TSDB_CODE_SUCCESS;
178✔
508

509
  (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
178✔
510
  if ((*pObj)->id < 0) {
178!
511
    tscError("failed to add object to clientConnRefPool");
×
512
    code = terrno;
×
513
    taosMemoryFree(*pObj);
×
514
    return code;
×
515
  }
516

517
  (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
178✔
518

519
  tscInfo("connObj:0x%" PRIx64 ", created, p:%p", (*pObj)->id, *pObj);
178!
520
  return code;
178✔
521
}
522

523
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
336,914✔
524

525
void releaseTscObj(int64_t rid) {
336,887✔
526
  int32_t code = taosReleaseRef(clientConnRefPool, rid);
336,887✔
527
  if (TSDB_CODE_SUCCESS != code) {
336,887!
528
    tscWarn("failed to release TscObj, code:%s", tstrerror(code));
×
529
  }
530
}
336,887✔
531

532
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
167,608✔
533
  int32_t code = TSDB_CODE_SUCCESS;
167,608✔
534
  *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
167,608!
535
  if (NULL == *pRequest) {
167,608!
536
    return terrno;
×
537
  }
538

539
  STscObj *pTscObj = acquireTscObj(connId);
167,608✔
540
  if (pTscObj == NULL) {
167,608!
541
    TSC_ERR_JRET(TSDB_CODE_TSC_DISCONNECTED);
×
542
  }
543
  SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
167,608!
544
  if (interParam == NULL) {
167,608!
545
    releaseTscObj(connId);
×
546
    TSC_ERR_JRET(terrno);
×
547
  }
548
  TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
167,608!
549
  interParam->pRequest = *pRequest;
167,608✔
550
  (*pRequest)->body.interParam = interParam;
167,608✔
551

552
  (*pRequest)->resType = RES_TYPE__QUERY;
167,608✔
553
  (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
167,608!
554
  (*pRequest)->metric.start = taosGetTimestampUs();
167,608✔
555

556
  (*pRequest)->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
167,608✔
557
  (*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt;
167,608✔
558
  (*pRequest)->type = type;
167,608✔
559
  (*pRequest)->allocatorRefId = -1;
167,608✔
560

561
  (*pRequest)->pDb = getDbOfConnection(pTscObj);
167,608✔
562
  if (NULL == (*pRequest)->pDb) {
167,608✔
563
    TSC_ERR_JRET(terrno);
511!
564
  }
565
  (*pRequest)->pTscObj = pTscObj;
167,608✔
566
  (*pRequest)->inCallback = false;
167,608✔
567
  (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
167,608!
568
  if (NULL == (*pRequest)->msgBuf) {
167,608!
569
    code = terrno;
×
570
    goto _return;
×
571
  }
572
  (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
167,608✔
573
  TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
167,608!
574
  TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
167,608!
575

576
  return TSDB_CODE_SUCCESS;
167,608✔
577
_return:
×
578
  if ((*pRequest)->pTscObj) {
×
579
    doDestroyRequest(*pRequest);
×
580
  } else {
581
    taosMemoryFree(*pRequest);
×
582
  }
583
  return code;
×
584
}
585

586
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
167,967✔
587
  taosMemoryFreeClear(pResInfo->pRspMsg);
167,967!
588
  taosMemoryFreeClear(pResInfo->length);
167,967!
589
  taosMemoryFreeClear(pResInfo->row);
167,967!
590
  taosMemoryFreeClear(pResInfo->pCol);
167,967!
591
  taosMemoryFreeClear(pResInfo->fields);
167,967!
592
  taosMemoryFreeClear(pResInfo->userFields);
167,967!
593
  taosMemoryFreeClear(pResInfo->convertJson);
167,967!
594
  taosMemoryFreeClear(pResInfo->decompBuf);
167,967!
595

596
  if (pResInfo->convertBuf != NULL) {
167,967✔
597
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
278,709✔
598
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
241,827!
599
    }
600
    taosMemoryFreeClear(pResInfo->convertBuf);
36,882!
601
  }
602
}
167,967✔
603

604
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
756,369✔
605

606
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
734,723✔
607

608
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
167,590✔
609

610
/// return the most previous req ref id
611
int64_t removeFromMostPrevReq(SRequestObj *pRequest) {
167,590✔
612
  int64_t      mostPrevReqRefId = pRequest->self;
167,590✔
613
  SRequestObj *pTmp = pRequest;
167,590✔
614
  while (pTmp->relation.prevRefId) {
167,590!
615
    pTmp = acquireRequest(pTmp->relation.prevRefId);
×
616
    if (pTmp) {
×
617
      mostPrevReqRefId = pTmp->self;
×
618
      (void)releaseRequest(mostPrevReqRefId);  // ignore error
×
619
    } else {
620
      break;
×
621
    }
622
  }
623
  (void)removeRequest(mostPrevReqRefId);  // ignore error
167,590✔
624
  return mostPrevReqRefId;
167,590✔
625
}
626

627
void destroyNextReq(int64_t nextRefId) {
167,590✔
628
  if (nextRefId) {
167,590!
629
    SRequestObj *pObj = acquireRequest(nextRefId);
×
630
    if (pObj) {
×
631
      (void)releaseRequest(nextRefId);  // ignore error
×
632
      (void)releaseRequest(nextRefId);  // ignore error
×
633
    }
634
  }
635
}
167,590✔
636

637
void destroySubRequests(SRequestObj *pRequest) {
×
638
  int32_t      reqIdx = -1;
×
639
  SRequestObj *pReqList[16] = {NULL};
×
640
  uint64_t     tmpRefId = 0;
×
641

642
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
×
643
    return;
×
644
  }
645

646
  SRequestObj *pTmp = pRequest;
×
647
  while (pTmp->relation.prevRefId) {
×
648
    tmpRefId = pTmp->relation.prevRefId;
×
649
    pTmp = acquireRequest(tmpRefId);
×
650
    if (pTmp) {
×
651
      pReqList[++reqIdx] = pTmp;
×
652
      (void)releaseRequest(tmpRefId);  // ignore error
×
653
    } else {
654
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
655
      break;
×
656
    }
657
  }
658

659
  for (int32_t i = reqIdx; i >= 0; i--) {
×
660
    (void)removeRequest(pReqList[i]->self);  // ignore error
×
661
  }
662

663
  tmpRefId = pRequest->relation.nextRefId;
×
664
  while (tmpRefId) {
×
665
    pTmp = acquireRequest(tmpRefId);
×
666
    if (pTmp) {
×
667
      tmpRefId = pTmp->relation.nextRefId;
×
668
      (void)removeRequest(pTmp->self);   // ignore error
×
669
      (void)releaseRequest(pTmp->self);  // ignore error
×
670
    } else {
671
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
672
      break;
×
673
    }
674
  }
675
}
676

677
void doDestroyRequest(void *p) {
167,590✔
678
  if (NULL == p) {
167,590!
679
    return;
×
680
  }
681

682
  SRequestObj *pRequest = (SRequestObj *)p;
167,590✔
683

684
  uint64_t reqId = pRequest->requestId;
167,590✔
685
  tscDebug("QID:0x%" PRIx64 ", begin destroy request, res:%p", reqId, pRequest);
167,590✔
686

687
  int64_t nextReqRefId = pRequest->relation.nextRefId;
167,590✔
688

689
  int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
167,590✔
690
  if (TSDB_CODE_SUCCESS != code) {
167,590✔
691
    tscDebug("failed to remove request from hash since %s", tstrerror(code));
178✔
692
  }
693
  schedulerFreeJob(&pRequest->body.queryJob, 0);
167,590✔
694

695
  destorySqlCallbackWrapper(pRequest->pWrapper);
167,590✔
696

697
  taosMemoryFreeClear(pRequest->msgBuf);
167,590!
698

699
  doFreeReqResultInfo(&pRequest->body.resInfo);
167,590✔
700
  if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
167,590!
701
    tscError("failed to destroy semaphore");
×
702
  }
703

704
  taosArrayDestroy(pRequest->tableList);
167,590✔
705
  taosArrayDestroy(pRequest->targetTableList);
167,590✔
706
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
167,590✔
707

708
  if (pRequest->self) {
167,590!
709
    deregisterRequest(pRequest);
167,590✔
710
  }
711

712
  taosMemoryFreeClear(pRequest->pDb);
167,590!
713
  taosArrayDestroy(pRequest->dbList);
167,590✔
714
  if (pRequest->body.interParam) {
167,590!
715
    if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
167,590!
716
      tscError("failed to destroy semaphore in pRequest");
×
717
    }
718
  }
719
  taosMemoryFree(pRequest->body.interParam);
167,590!
720

721
  qDestroyQuery(pRequest->pQuery);
167,590✔
722
  nodesDestroyAllocator(pRequest->allocatorRefId);
167,590✔
723

724
  taosMemoryFreeClear(pRequest->effectiveUser);
167,590!
725
  taosMemoryFreeClear(pRequest->sqlstr);
167,590!
726
  taosMemoryFree(pRequest);
167,590!
727
  tscDebug("QID:0x%" PRIx64 ", end destroy request, res:%p", reqId, pRequest);
167,590✔
728
  destroyNextReq(nextReqRefId);
167,590✔
729
}
730

731
void destroyRequest(SRequestObj *pRequest) {
167,590✔
732
  if (pRequest == NULL) return;
167,590!
733

734
  taos_stop_query(pRequest);
167,590✔
735
  (void)removeFromMostPrevReq(pRequest);
167,590✔
736
}
737

738
void taosStopQueryImpl(SRequestObj *pRequest) {
167,590✔
739
  pRequest->killed = true;
167,590✔
740

741
  // It is not a query, no need to stop.
742
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
167,590✔
743
    tscDebug("QID:0x%" PRIx64 ", no need to be killed since not query", pRequest->requestId);
116,923✔
744
    return;
116,923✔
745
  }
746

747
  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
50,667✔
748
  tscDebug("QID:0x%" PRIx64 ", killed", pRequest->requestId);
50,667✔
749
}
750

751
void stopAllQueries(SRequestObj *pRequest) {
167,590✔
752
  int32_t      reqIdx = -1;
167,590✔
753
  SRequestObj *pReqList[16] = {NULL};
167,590✔
754
  uint64_t     tmpRefId = 0;
167,590✔
755

756
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
167,590!
757
    return;
×
758
  }
759

760
  SRequestObj *pTmp = pRequest;
167,590✔
761
  while (pTmp->relation.prevRefId) {
167,590!
762
    tmpRefId = pTmp->relation.prevRefId;
×
763
    pTmp = acquireRequest(tmpRefId);
×
764
    if (pTmp) {
×
765
      pReqList[++reqIdx] = pTmp;
×
766
    } else {
767
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
768
      break;
×
769
    }
770
  }
771

772
  for (int32_t i = reqIdx; i >= 0; i--) {
167,590!
773
    taosStopQueryImpl(pReqList[i]);
×
774
    (void)releaseRequest(pReqList[i]->self);  // ignore error
×
775
  }
776

777
  taosStopQueryImpl(pRequest);
167,590✔
778

779
  tmpRefId = pRequest->relation.nextRefId;
167,590✔
780
  while (tmpRefId) {
167,590!
781
    pTmp = acquireRequest(tmpRefId);
×
782
    if (pTmp) {
×
783
      tmpRefId = pTmp->relation.nextRefId;
×
784
      taosStopQueryImpl(pTmp);
×
785
      (void)releaseRequest(pTmp->self);  // ignore error
×
786
    } else {
787
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
788
      break;
×
789
    }
790
  }
791
}
792

793
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
×
794

795
static void *tscCrashReportThreadFp(void *param) {
×
796
  int32_t code = 0;
×
797
  setThreadName("client-crashReport");
×
798
  char filepath[PATH_MAX] = {0};
×
799
  (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
×
800
  char     *pMsg = NULL;
×
801
  int64_t   msgLen = 0;
×
802
  TdFilePtr pFile = NULL;
×
803
  bool      truncateFile = false;
×
804
  int32_t   sleepTime = 200;
×
805
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
806
  int32_t   loopTimes = reportPeriodNum;
×
807

808
#ifdef WINDOWS
809
  if (taosCheckCurrentInDll()) {
810
    atexit(crashReportThreadFuncUnexpectedStopped);
811
  }
812
#endif
813

814
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
×
815
    return NULL;
×
816
  }
817
  STelemAddrMgmt mgt;
818
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
819
  if (code) {
×
820
    tscError("failed to init telemetry management, code:%s", tstrerror(code));
×
821
    return NULL;
×
822
  }
823

824
  code = initCrashLogWriter();
×
825
  if (code) {
×
826
    tscError("failed to init crash log writer, code:%s", tstrerror(code));
×
827
    return NULL;
×
828
  }
829

830
  while (1) {
831
    checkAndPrepareCrashInfo();
×
832
    if (clientStop > 0 && reportThreadSetQuit()) break;
×
833
    if (loopTimes++ < reportPeriodNum) {
×
834
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
835
      taosMsleep(sleepTime);
×
836
      continue;
×
837
    }
838

839
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
840
    if (pMsg && msgLen > 0) {
×
841
      if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
842
        tscError("failed to send crash report");
×
843
        if (pFile) {
×
844
          taosReleaseCrashLogFile(pFile, false);
×
845
          pFile = NULL;
×
846

847
          taosMsleep(sleepTime);
×
848
          loopTimes = 0;
×
849
          continue;
×
850
        }
851
      } else {
852
        tscInfo("succeed to send crash report");
×
853
        truncateFile = true;
×
854
      }
855
    } else {
856
      tscInfo("no crash info was found");
×
857
    }
858

859
    taosMemoryFree(pMsg);
×
860

861
    if (pMsg && msgLen > 0) {
×
862
      pMsg = NULL;
×
863
      continue;
×
864
    }
865

866
    if (pFile) {
×
867
      taosReleaseCrashLogFile(pFile, truncateFile);
×
868
      pFile = NULL;
×
869
      truncateFile = false;
×
870
    }
871

872
    taosMsleep(sleepTime);
×
873
    loopTimes = 0;
×
874
  }
875
  taosTelemetryDestroy(&mgt);
×
876

877
  clientStop = -2;
×
878
  return NULL;
×
879
}
880

881
int32_t tscCrashReportInit() {
17✔
882
  if (!tsEnableCrashReport) {
17!
883
    return TSDB_CODE_SUCCESS;
17✔
884
  }
885
  int32_t      code = TSDB_CODE_SUCCESS;
×
886
  TdThreadAttr thAttr;
887
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
×
888
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
×
889
  TdThread crashReportThread;
890
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
×
891
    tscError("failed to create crashReport thread since %s", strerror(errno));
×
892
    terrno = TAOS_SYSTEM_ERROR(errno);
×
893
    TSC_ERR_RET(errno);
×
894
  }
895

896
  (void)taosThreadAttrDestroy(&thAttr);
×
897
_return:
×
898
  if (code) {
×
899
    terrno = TAOS_SYSTEM_ERROR(errno);
×
900
    TSC_ERR_RET(terrno);
×
901
  }
902

903
  return code;
×
904
}
905

906
void tscStopCrashReport() {
17✔
907
  if (!tsEnableCrashReport) {
17!
908
    return;
17✔
909
  }
910

911
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
×
912
    tscDebug("crash report thread already stopped");
×
913
    return;
×
914
  }
915

916
  while (atomic_load_32(&clientStop) > 0) {
×
917
    taosMsleep(100);
×
918
  }
919
}
920

921
void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
×
922
  writeCrashLogToFile(signum, sigInfo, CUS_PROMPT, lastClusterId, appInfo.startTime);
×
923
}
×
924

925
void taos_init_imp(void) {
17✔
926
#if defined(LINUX)
927
  if (tscDbg.memEnable) {
17!
928
    int32_t code = taosMemoryDbgInit();
×
929
    if (code) {
×
930
      (void)printf("failed to init memory dbg, error:%s\n", tstrerror(code));
×
931
    } else {
932
      tsAsyncLog = false;
×
933
      (void)printf("memory dbg enabled\n");
×
934
    }
935
  }
936
#endif
937

938
  // In the APIs of other program language, taos_cleanup is not available yet.
939
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
940
  (void)atexit(taos_cleanup);
17✔
941
  errno = TSDB_CODE_SUCCESS;
17✔
942
  taosSeedRand(taosGetTimestampSec());
17✔
943

944
  appInfo.pid = taosGetPId();
17✔
945
  appInfo.startTime = taosGetTimestampMs();
17✔
946
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
17✔
947
  appInfo.pInstMapByClusterId =
17✔
948
      taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
17✔
949
  if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
17!
950
    (void)printf("failed to allocate memory when init appInfo\n");
×
951
    tscInitRes = terrno;
×
952
    return;
×
953
  }
954
  taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
17✔
955

956
  const char *logName = CUS_PROMPT "log";
17✔
957
  ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
17!
958
  if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
17!
959
    (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(errno), configDir);
×
960
    tscInitRes = terrno;
×
961
    return;
×
962
  }
963

964
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
17!
965

966
  initQueryModuleMsgHandle();
17✔
967
  if ((tsCharsetCxt = taosConvInit(tsCharset)) == NULL) {
17!
968
    tscInitRes = terrno;
×
969
    tscError("failed to init conv");
×
970
    return;
×
971
  }
972
#ifndef WINDOWS
973
  ENV_ERR_RET(tzInit(), "failed to init timezone");
17!
974
#endif
975
  ENV_ERR_RET(monitorInit(), "failed to init monitor");
17!
976
  ENV_ERR_RET(rpcInit(), "failed to init rpc");
17!
977

978
  if (InitRegexCache() != 0) {
17!
979
    tscInitRes = terrno;
×
980
    (void)printf("failed to init regex cache\n");
×
981
    return;
×
982
  }
983

984
  tscInfo("starting to initialize TAOS driver");
17!
985

986
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
17✔
987
  ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
17!
988
  ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
17!
989
  ENV_ERR_RET(initClientId(), "failed to init clientId");
17!
990

991
  ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
17!
992
  ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
17!
993
  ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
17!
994

995
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
17✔
996
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
17✔
997

998
  ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
17!
999
  ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
17!
1000
  ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
17!
1001
  ENV_ERR_RET(qInitKeywordsTable(), "failed to init parser keywords table");
17!
1002

1003
  tscInfo("TAOS driver is initialized successfully");
17!
1004
}
1005

1006
int taos_init() {
177✔
1007
  (void)taosThreadOnce(&tscinit, taos_init_imp);
177✔
1008
  return tscInitRes;
178✔
1009
}
1010

1011
const char *getCfgName(TSDB_OPTION option) {
×
1012
  const char *name = NULL;
×
1013

1014
  switch (option) {
×
1015
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
×
1016
      name = "shellActivityTimer";
×
1017
      break;
×
1018
    case TSDB_OPTION_LOCALE:
×
1019
      name = "locale";
×
1020
      break;
×
1021
    case TSDB_OPTION_CHARSET:
×
1022
      name = "charset";
×
1023
      break;
×
1024
    case TSDB_OPTION_TIMEZONE:
×
1025
      name = "timezone";
×
1026
      break;
×
1027
    case TSDB_OPTION_USE_ADAPTER:
×
1028
      name = "useAdapter";
×
1029
      break;
×
1030
    default:
×
1031
      break;
×
1032
  }
1033

1034
  return name;
×
1035
}
1036

1037
int taos_options_imp(TSDB_OPTION option, const char *str) {
×
1038
  if (option == TSDB_OPTION_CONFIGDIR) {
×
1039
#ifndef WINDOWS
1040
    char newstr[PATH_MAX];
1041
    int  len = strlen(str);
×
1042
    if (len > 1 && str[0] != '"' && str[0] != '\'') {
×
1043
      if (len + 2 >= PATH_MAX) {
×
1044
        tscError("Too long path %s", str);
×
1045
        return -1;
×
1046
      }
1047
      newstr[0] = '"';
×
1048
      (void)memcpy(newstr + 1, str, len);
×
1049
      newstr[len + 1] = '"';
×
1050
      newstr[len + 2] = '\0';
×
1051
      str = newstr;
×
1052
    }
1053
#endif
1054
    tstrncpy(configDir, str, PATH_MAX);
×
1055
    tscInfo("set cfg:%s to %s", configDir, str);
×
1056
    return 0;
×
1057
  }
1058

1059
  // initialize global config
1060
  if (taos_init() != 0) {
×
1061
    return -1;
×
1062
  }
1063

1064
  SConfig     *pCfg = taosGetCfg();
×
1065
  SConfigItem *pItem = NULL;
×
1066
  const char  *name = getCfgName(option);
×
1067

1068
  if (name == NULL) {
×
1069
    tscError("Invalid option %d", option);
×
1070
    return -1;
×
1071
  }
1072

1073
  pItem = cfgGetItem(pCfg, name);
×
1074
  if (pItem == NULL) {
×
1075
    tscError("Invalid option %d", option);
×
1076
    return -1;
×
1077
  }
1078

1079
  int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
×
1080
  if (code != 0) {
×
1081
    tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
×
1082
  } else {
1083
    tscInfo("set cfg:%s to %s", name, str);
×
1084
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
×
1085
      code = taosCfgDynamicOptions(pCfg, name, false);
×
1086
    }
1087
  }
1088

1089
  return code;
×
1090
}
1091

1092
/**
1093
 * The request id is an unsigned integer format of 64bit.
1094
 *+------------+-----+-----------+---------------+
1095
 *| uid|localIp| PId | timestamp | serial number |
1096
 *+------------+-----+-----------+---------------+
1097
 *| 12bit      |12bit|24bit      |16bit          |
1098
 *+------------+-----+-----------+---------------+
1099
 * @return
1100
 */
1101
uint64_t generateRequestId() {
168,894✔
1102
  static uint32_t hashId = 0;
1103
  static int32_t  requestSerialId = 0;
1104

1105
  if (hashId == 0) {
168,894✔
1106
    int32_t code = taosGetSystemUUIDU32(&hashId);
17✔
1107
    if (code != TSDB_CODE_SUCCESS) {
17!
1108
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
×
1109
               tstrerror(code));
1110
    }
1111
  }
1112

1113
  uint64_t id = 0;
168,894✔
1114

1115
  while (true) {
×
1116
    int64_t  ts = taosGetTimestampMs();
168,894✔
1117
    uint64_t pid = taosGetPId();
168,894✔
1118
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
168,894✔
1119
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
168,894✔
1120

1121
    id = (((uint64_t)(hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
168,894✔
1122
    if (id) {
168,894!
1123
      break;
168,894✔
1124
    }
1125
  }
1126
  return id;
168,894✔
1127
}
1128

1129
#if 0
1130
#include "cJSON.h"
1131
static setConfRet taos_set_config_imp(const char *config){
1132
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
1133
  static bool setConfFlag = false;
1134
  if (setConfFlag) {
1135
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
1136
    tstrncpy(ret.retMsg, "configuration can only set once", RET_MSG_LENGTH);
1137
    return ret;
1138
  }
1139
  taosInitGlobalCfg();
1140
  cJSON *root = cJSON_Parse(config);
1141
  if (root == NULL){
1142
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
1143
    tstrncpy(ret.retMsg, "parse json error", RET_MSG_LENGTH);
1144
    return ret;
1145
  }
1146

1147
  int size = cJSON_GetArraySize(root);
1148
  if(!cJSON_IsObject(root) || size == 0) {
1149
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
1150
    tstrncpy(ret.retMsg, "json content is invalid, must be not empty object", RET_MSG_LENGTH);
1151
    return ret;
1152
  }
1153

1154
  if(size >= 1000) {
1155
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
1156
    tstrncpy(ret.retMsg, "json object size is too long", RET_MSG_LENGTH);
1157
    return ret;
1158
  }
1159

1160
  for(int i = 0; i < size; i++){
1161
    cJSON *item = cJSON_GetArrayItem(root, i);
1162
    if(!item) {
1163
      ret.retCode = SET_CONF_RET_ERR_INNER;
1164
      tstrncpy(ret.retMsg, "inner error", RET_MSG_LENGTH);
1165
      return ret;
1166
    }
1167
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
1168
      ret.retCode = SET_CONF_RET_ERR_PART;
1169
      if (strlen(ret.retMsg) == 0){
1170
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
1171
      }else{
1172
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1173
        size_t leftSize = tmp >= 0 ? tmp : 0;
1174
        strncat(ret.retMsg, "|",  leftSize);
1175
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1176
        leftSize = tmp >= 0 ? tmp : 0;
1177
        strncat(ret.retMsg, item->string, leftSize);
1178
      }
1179
    }
1180
  }
1181
  cJSON_Delete(root);
1182
  setConfFlag = true;
1183
  return ret;
1184
}
1185

1186
setConfRet taos_set_config(const char *config){
1187
  taosThreadMutexLock(&setConfMutex);
1188
  setConfRet ret = taos_set_config_imp(config);
1189
  taosThreadMutexUnlock(&setConfMutex);
1190
  return ret;
1191
}
1192
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc