• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4864

26 Nov 2025 05:46AM UTC coverage: 64.548% (+0.009%) from 64.539%
#4864

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

769 of 945 new or added lines in 33 files covered. (81.38%)

3006 existing lines in 116 files now uncovered.

158227 of 245129 relevant lines covered (64.55%)

111826500.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.44
/source/client/src/clientEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ttimer.h>
17
#include "cJSON.h"
18
#include "catalog.h"
19
#include "clientInt.h"
20
#include "clientLog.h"
21
#include "clientMonitor.h"
22
#include "functionMgt.h"
23
#include "os.h"
24
#include "osSleep.h"
25
#include "query.h"
26
#include "qworker.h"
27
#include "scheduler.h"
28
#include "tcache.h"
29
#include "tcompare.h"
30
#include "tconv.h"
31
#include "tglobal.h"
32
#include "thttp.h"
33
#include "tmsg.h"
34
#include "tqueue.h"
35
#include "tref.h"
36
#include "trpc.h"
37
#include "tsched.h"
38
#include "ttime.h"
39
#include "tversion.h"
40

41
#include "cus_name.h"
42

43
#define TSC_VAR_NOT_RELEASE 1
44
#define TSC_VAR_RELEASED    0
45

46
#define ENV_JSON_FALSE_CHECK(c)                     \
47
  do {                                              \
48
    if (!c) {                                       \
49
      tscError("faild to add item to JSON object"); \
50
      code = TSDB_CODE_TSC_FAIL_GENERATE_JSON;      \
51
      goto _end;                                    \
52
    }                                               \
53
  } while (0)
54

55
#define ENV_ERR_RET(c, info)          \
56
  do {                                \
57
    int32_t _code = (c);              \
58
    if (_code != TSDB_CODE_SUCCESS) { \
59
      terrno = _code;                 \
60
      tscInitRes = _code;             \
61
      tscError(info);                 \
62
      return;                         \
63
    }                                 \
64
  } while (0)
65

66
STscDbg   tscDbg = {0};
67
SAppInfo  appInfo;
68
int64_t   lastClusterId = 0;
69
int32_t   clientReqRefPool = -1;
70
int32_t   clientConnRefPool = -1;
71
int32_t   clientStop = -1;
72
SHashObj *pTimezoneMap = NULL;
73

74
int32_t timestampDeltaLimit = 900;  // s
75

76
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
77
volatile int32_t    tscInitRes = 0;
78

79
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
624,636,120✔
80
  int32_t code = TSDB_CODE_SUCCESS;
624,636,120✔
81
  // connection has been released already, abort creating request.
82
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
624,636,120✔
83
  if (pRequest->self < 0) {
624,640,775✔
84
    tscError("failed to add ref to request");
×
85
    code = terrno;
×
86
    return code;
×
87
  }
88

89
  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
624,632,506✔
90

91
  if (pTscObj->pAppInfo) {
624,641,424✔
92
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
624,642,261✔
93

94
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
624,638,885✔
95
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
624,639,129✔
96
    tscDebug("req:0x%" PRIx64 ", create request from conn:0x%" PRIx64
624,638,298✔
97
             ", current:%d, app current:%d, total:%d, QID:0x%" PRIx64,
98
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
99
  }
100

101
  return code;
624,634,419✔
102
}
103

104
static void concatStrings(SArray *list, char *buf, int size) {
×
105
  int len = 0;
×
106
  for (int i = 0; i < taosArrayGetSize(list); i++) {
×
107
    char *db = taosArrayGet(list, i);
×
108
    if (NULL == db) {
×
109
      tscError("get dbname failed, buf:%s", buf);
×
110
      break;
×
111
    }
112
    char *dot = strchr(db, '.');
×
113
    if (dot != NULL) {
×
114
      db = dot + 1;
×
115
    }
116
    if (i != 0) {
×
117
      (void)strncat(buf, ",", size - 1 - len);
×
118
      len += 1;
×
119
    }
120
    int ret = tsnprintf(buf + len, size - len, "%s", db);
×
121
    if (ret < 0) {
×
122
      tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
×
123
      break;
×
124
    }
125
    len += ret;
×
126
    if (len >= size) {
×
127
      tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
×
128
      break;
×
129
    }
130
  }
131
}
×
132
#ifdef USE_REPORT
133
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration) {
×
134
  cJSON  *json = cJSON_CreateObject();
×
135
  int32_t code = TSDB_CODE_SUCCESS;
×
136
  if (json == NULL) {
×
137
    tscError("failed to create monitor json");
×
138
    return TSDB_CODE_OUT_OF_MEMORY;
×
139
  }
140
  char clusterId[32] = {0};
×
141
  if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0) {
×
142
    tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
×
143
    code = TSDB_CODE_FAILED;
×
144
    goto _end;
×
145
  }
146

147
  char startTs[32] = {0};
×
148
  if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start / 1000) < 0) {
×
149
    tscError("failed to generate startTs:%" PRId64, pRequest->metric.start / 1000);
×
150
    code = TSDB_CODE_FAILED;
×
151
    goto _end;
×
152
  }
153

154
  char requestId[32] = {0};
×
155
  if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0) {
×
156
    tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
×
157
    code = TSDB_CODE_FAILED;
×
158
    goto _end;
×
159
  }
160
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
×
161
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
×
162
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
×
163
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration / 1000)));
×
164
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
×
165
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
×
166
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
×
167
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
×
168
      json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
169
  if (pRequest->sqlstr != NULL &&
×
170
      strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
×
171
    char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
×
172
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
×
173
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
174
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
×
175
  } else {
176
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
177
  }
178

179
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
×
180
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
×
181
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
×
182

183
  char pid[32] = {0};
×
184
  if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0) {
×
185
    tscError("failed to generate pid:%d", appInfo.pid);
×
186
    code = TSDB_CODE_FAILED;
×
187
    goto _end;
×
188
  }
189

190
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
×
191
  if (pRequest->dbList != NULL) {
×
192
    char dbList[1024] = {0};
×
193
    concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
×
194
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
×
195
  } else if (pRequest->pDb != NULL) {
×
196
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
×
197
  } else {
198
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
×
199
  }
200

201
  char *value = cJSON_PrintUnformatted(json);
×
202
  if (value == NULL) {
×
203
    tscError("failed to print json");
×
204
    code = TSDB_CODE_FAILED;
×
205
    goto _end;
×
206
  }
207
  MonitorSlowLogData data = {0};
×
208
  data.clusterId = pTscObj->pAppInfo->clusterId;
×
209
  data.type = SLOW_LOG_WRITE;
×
210
  data.data = value;
×
211
  code = monitorPutData2MonitorQueue(data);
×
212
  if (TSDB_CODE_SUCCESS != code) {
×
213
    taosMemoryFree(value);
×
214
    goto _end;
×
215
  }
216

217
_end:
×
218
  cJSON_Delete(json);
×
219
  return code;
×
220
}
221
#endif
222
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char *exceptDb) {
325,293✔
223
  if (pRequest->pDb != NULL) {
325,293✔
224
    return strcmp(pRequest->pDb, exceptDb) != 0;
208,264✔
225
  }
226

227
  for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
174,332✔
228
    char *db = taosArrayGet(pRequest->dbList, i);
57,303✔
229
    if (NULL == db) {
57,303✔
230
      tscError("get dbname failed, exceptDb:%s", exceptDb);
×
231
      return false;
×
232
    }
233
    char *dot = strchr(db, '.');
57,303✔
234
    if (dot != NULL) {
57,303✔
235
      db = dot + 1;
57,303✔
236
    }
237
    if (strcmp(db, exceptDb) == 0) {
57,303✔
238
      return false;
×
239
    }
240
  }
241
  return true;
117,029✔
242
}
243

244
static void deregisterRequest(SRequestObj *pRequest) {
624,442,229✔
245
  if (pRequest == NULL) {
624,442,229✔
246
    tscError("pRequest == NULL");
×
247
    return;
×
248
  }
249

250
  STscObj            *pTscObj = pRequest->pTscObj;
624,442,229✔
251
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
624,445,353✔
252

253
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
624,444,180✔
254
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
624,451,885✔
255
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
624,449,777✔
256

257
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
624,448,957✔
258
  tscDebug("req:0x%" PRIx64 ", free from conn:0x%" PRIx64 ", QID:0x%" PRIx64
624,441,781✔
259
           ", elapsed:%.2f ms, current:%d, app current:%d",
260
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
261

262
  if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
624,441,781✔
263
    if ((pRequest->pQuery && pRequest->pQuery->pRoot && QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
624,448,396✔
264
         (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
492,381,295✔
265
        QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
183,344,360✔
266
      tscDebug("req:0x%" PRIx64 ", insert duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
493,225,191✔
267
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
268
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
269
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
270
               pRequest->requestId);
271
      (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
493,225,191✔
272
      reqType = SLOW_LOG_TYPE_INSERT;
493,227,925✔
273
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
131,222,664✔
274
      tscDebug("req:0x%" PRIx64 ", query duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
89,484,869✔
275
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
276
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
277
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
278
               pRequest->requestId);
279

280
      (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
89,484,869✔
281
      reqType = SLOW_LOG_TYPE_QUERY;
89,485,848✔
282
    }
283

284
    if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
624,451,074✔
285
      tscError("failed to release allocator");
×
286
    }
287
  }
288

289
#ifdef USE_REPORT
290
  if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
624,446,348✔
291
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
2,630,082✔
292
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
2,477,375✔
293
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
152,707✔
294
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
66,986✔
295
    } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
85,721✔
296
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
×
297
    }
298
  }
299

300
  if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
624,771,122✔
301
      checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
325,293✔
302
    (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
325,293✔
303
    if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
325,293✔
304
      taosPrintSlowLog("PID:%d, connId:%u, QID:0x%" PRIx64 ", Start:%" PRId64 "us, Duration:%" PRId64 "us, SQL:%s",
141,877✔
305
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
306
                       pRequest->sqlstr);
307
      if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
141,877✔
308
        slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
×
309
        if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
×
310
          tscError("failed to generate write slow log");
×
311
        }
312
      }
313
    }
314
  }
315
#endif
316

317
  releaseTscObj(pTscObj->id);
624,444,804✔
318
}
319

320
// todo close the transporter properly
321
void closeTransporter(SAppInstInfo *pAppInfo) {
1,348,630✔
322
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
1,348,630✔
323
    return;
×
324
  }
325

326
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
1,348,630✔
327
  rpcClose(pAppInfo->pTransporter);
1,348,630✔
328
}
329

330
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
19,579,953✔
331
  if (NEED_REDIRECT_ERROR(code)) {
19,579,953✔
332
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
15,634,036✔
333
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK ||
15,634,210✔
334
        msgType == TDMT_SCH_TASK_NOTIFY) {
UNCOV
335
      return false;
×
336
    }
337
    return true;
15,634,210✔
338
  } else if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY || code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE ||
3,945,917✔
339
             code == TSDB_CODE_SYN_WRITE_STALL || code == TSDB_CODE_SYN_PROPOSE_NOT_READY ||
3,946,157✔
340
             code == TSDB_CODE_SYN_RESTORING) {
341
    tscDebug("client msg type %s should retry since %s", TMSG_INFO(msgType), tstrerror(code));
256✔
342
    return true;
×
343
  } else {
344
    return false;
3,945,901✔
345
  }
346
}
347

348
// start timer for particular msgType
349
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
×
350
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
×
351
    return true;
×
352
  }
353
  return false;
×
354
}
355

356
// TODO refactor
357
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
1,348,630✔
358
  SRpcInit rpcInit;
1,249,491✔
359
  (void)memset(&rpcInit, 0, sizeof(rpcInit));
1,348,630✔
360
  rpcInit.localPort = 0;
1,348,630✔
361
  rpcInit.label = "TSC";
1,348,630✔
362
  rpcInit.numOfThreads = tsNumOfRpcThreads;
1,348,630✔
363
  rpcInit.cfp = processMsgFromServer;
1,348,630✔
364
  rpcInit.rfp = clientRpcRfp;
1,348,630✔
365
  rpcInit.sessions = 1024;
1,348,630✔
366
  rpcInit.connType = TAOS_CONN_CLIENT;
1,348,630✔
367
  rpcInit.user = (char *)user;
1,348,630✔
368
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,348,630✔
369
  rpcInit.compressSize = tsCompressMsgSize;
1,348,630✔
370
  rpcInit.dfp = destroyAhandle;
1,348,630✔
371

372
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,348,630✔
373
  rpcInit.retryStepFactor = tsRedirectFactor;
1,348,630✔
374
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,348,630✔
375
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,348,630✔
376

377
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
1,348,630✔
378
  connLimitNum = TMAX(connLimitNum, 10);
1,348,630✔
379
  connLimitNum = TMIN(connLimitNum, 1000);
1,348,630✔
380
  rpcInit.connLimitNum = connLimitNum;
1,348,630✔
381
  rpcInit.shareConnLimit = tsShareConnLimit;
1,348,630✔
382
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,348,630✔
383
  rpcInit.startReadTimer = 1;
1,348,630✔
384
  rpcInit.readTimeout = tsReadTimeout;
1,348,630✔
385
  rpcInit.ipv6 = tsEnableIpv6;
1,348,630✔
386
  rpcInit.enableSSL = tsEnableTLS;
1,348,630✔
387

388
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
1,348,630✔
389
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
1,348,630✔
390
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
1,348,630✔
391
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
1,348,630✔
392
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
1,348,630✔
393

394
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
1,348,630✔
395
  if (TSDB_CODE_SUCCESS != code) {
1,348,630✔
396
    tscError("invalid version string.");
×
397
    return code;
×
398
  }
399

400
  tscInfo("rpc max retry timeout %" PRId64 "", rpcInit.retryMaxTimeout);
1,348,630✔
401
  *pDnodeConn = rpcOpen(&rpcInit);
1,348,630✔
402
  if (*pDnodeConn == NULL) {
1,348,630✔
403
    tscError("failed to init connection to server since %s", tstrerror(terrno));
×
404
    code = terrno;
×
405
  }
406

407
  return code;
1,348,630✔
408
}
409

410
void destroyAllRequests(SHashObj *pRequests) {
3,014,734✔
411
  void *pIter = taosHashIterate(pRequests, NULL);
3,014,734✔
412
  while (pIter != NULL) {
3,014,734✔
413
    int64_t *rid = pIter;
×
414

415
    SRequestObj *pRequest = acquireRequest(*rid);
×
416
    if (pRequest) {
×
417
      destroyRequest(pRequest);
×
418
      (void)releaseRequest(*rid);  // ignore error
×
419
    }
420

421
    pIter = taosHashIterate(pRequests, pIter);
×
422
  }
423
}
3,014,734✔
424

425
void stopAllRequests(SHashObj *pRequests) {
263✔
426
  void *pIter = taosHashIterate(pRequests, NULL);
263✔
427
  while (pIter != NULL) {
263✔
UNCOV
428
    int64_t *rid = pIter;
×
429

UNCOV
430
    SRequestObj *pRequest = acquireRequest(*rid);
×
UNCOV
431
    if (pRequest) {
×
432
      taos_stop_query(pRequest);
×
433
      (void)releaseRequest(*rid);  // ignore error
×
434
    }
435

UNCOV
436
    pIter = taosHashIterate(pRequests, pIter);
×
437
  }
438
}
263✔
439

440
void destroyAppInst(void *info) {
1,348,630✔
441
  SAppInstInfo *pAppInfo = *(SAppInstInfo **)info;
1,348,630✔
442
  tscInfo("destroy app inst mgr %p", pAppInfo);
1,348,630✔
443

444
  int32_t code = taosThreadMutexLock(&appInfo.mutex);
1,348,630✔
445
  if (TSDB_CODE_SUCCESS != code) {
1,348,630✔
446
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
447
  }
448

449
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
1,348,630✔
450

451
  code = taosThreadMutexUnlock(&appInfo.mutex);
1,348,630✔
452
  if (TSDB_CODE_SUCCESS != code) {
1,348,630✔
453
    tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
454
  }
455

456
  taosMemoryFreeClear(pAppInfo->instKey);
1,348,630✔
457
  closeTransporter(pAppInfo);
1,348,630✔
458

459
  code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
1,348,630✔
460
  if (TSDB_CODE_SUCCESS != code) {
1,348,630✔
461
    tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
462
  }
463

464
  taosArrayDestroy(pAppInfo->pQnodeList);
1,348,630✔
465
  code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
1,348,630✔
466
  if (TSDB_CODE_SUCCESS != code) {
1,348,630✔
467
    tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
468
  }
469

470
  taosMemoryFree(pAppInfo);
1,348,630✔
471
}
1,348,630✔
472

473
void destroyTscObj(void *pObj) {
3,013,950✔
474
  if (NULL == pObj) {
3,013,950✔
475
    return;
×
476
  }
477

478
  STscObj *pTscObj = pObj;
3,013,950✔
479
  int64_t  tscId = pTscObj->id;
3,013,950✔
480
  tscTrace("conn:%" PRIx64 ", begin destroy, p:%p", tscId, pTscObj);
3,014,025✔
481

482
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
3,014,025✔
483
  hbDeregisterConn(pTscObj, connKey);
3,013,633✔
484

485
  destroyAllRequests(pTscObj->pRequests);
3,014,734✔
486
  taosHashCleanup(pTscObj->pRequests);
3,014,734✔
487

488
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
3,014,734✔
489
  tscDebug("conn:0x%" PRIx64 ", p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
3,014,734✔
490
           pTscObj->pAppInfo->numOfConns);
491

492
  // In any cases, we should not free app inst here. Or an race condition rises.
493
  /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
3,014,734✔
494

495
  (void)taosThreadMutexDestroy(&pTscObj->mutex);
3,014,734✔
496
  taosMemoryFree(pTscObj);
3,014,734✔
497

498
  tscTrace("conn:0x%" PRIx64 ", end destroy, p:%p", tscId, pTscObj);
3,014,734✔
499
}
500

501
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
3,245,895✔
502
                     STscObj **pObj) {
503
  *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
3,245,895✔
504
  if (NULL == *pObj) {
3,245,895✔
505
    return terrno;
×
506
  }
507

508
  (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
3,245,895✔
509
  if (NULL == (*pObj)->pRequests) {
3,245,210✔
510
    taosMemoryFree(*pObj);
×
511
    return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
512
  }
513

514
  (*pObj)->connType = connType;
3,245,602✔
515
  (*pObj)->pAppInfo = pAppInfo;
3,245,602✔
516
  (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
3,245,602✔
517
  tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
3,245,210✔
518
  (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
3,245,602✔
519

520
  if (db != NULL) {
3,245,602✔
521
    tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
3,245,501✔
522
  }
523

524
  TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
3,245,602✔
525

526
  int32_t code = TSDB_CODE_SUCCESS;
3,245,654✔
527

528
  (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
3,245,654✔
529
  if ((*pObj)->id < 0) {
3,245,683✔
530
    tscError("failed to add object to clientConnRefPool");
×
531
    code = terrno;
×
532
    taosMemoryFree(*pObj);
×
533
    return code;
×
534
  }
535

536
  (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
3,244,798✔
537

538
  tscInfo("conn:0x%" PRIx64 ", created, p:%p", (*pObj)->id, *pObj);
3,244,931✔
539
  return code;
3,245,895✔
540
}
541

542
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
709,249,644✔
543

544
void releaseTscObj(int64_t rid) {
706,808,931✔
545
  int32_t code = taosReleaseRef(clientConnRefPool, rid);
706,808,931✔
546
  if (TSDB_CODE_SUCCESS != code) {
706,816,840✔
547
    tscWarn("failed to release TscObj, code:%s", tstrerror(code));
×
548
  }
549
}
706,816,840✔
550

551
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
624,628,753✔
552
  int32_t code = TSDB_CODE_SUCCESS;
624,628,753✔
553
  *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
624,628,753✔
554
  if (NULL == *pRequest) {
624,627,755✔
555
    return terrno;
×
556
  }
557

558
  STscObj *pTscObj = acquireTscObj(connId);
624,631,655✔
559
  if (pTscObj == NULL) {
624,642,152✔
560
    TSC_ERR_JRET(TSDB_CODE_TSC_DISCONNECTED);
×
561
  }
562
  SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
624,642,152✔
563
  if (interParam == NULL) {
624,642,085✔
564
    releaseTscObj(connId);
×
565
    TSC_ERR_JRET(terrno);
×
566
  }
567
  TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
624,642,085✔
568
  interParam->pRequest = *pRequest;
624,638,043✔
569
  (*pRequest)->body.interParam = interParam;
624,641,704✔
570

571
  (*pRequest)->resType = RES_TYPE__QUERY;
624,638,189✔
572
  (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
624,631,928✔
573
  (*pRequest)->metric.start = taosGetTimestampUs();
1,225,329,980✔
574

575
  (*pRequest)->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
624,638,929✔
576
  (*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt;
624,635,997✔
577
  (*pRequest)->type = type;
624,635,342✔
578
  (*pRequest)->allocatorRefId = -1;
624,640,399✔
579

580
  (*pRequest)->pDb = getDbOfConnection(pTscObj);
624,637,836✔
581
  if (NULL == (*pRequest)->pDb) {
624,634,656✔
582
    TSC_ERR_JRET(terrno);
66,084,014✔
583
  }
584
  (*pRequest)->pTscObj = pTscObj;
624,630,320✔
585
  (*pRequest)->inCallback = false;
624,632,225✔
586
  (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
624,636,542✔
587
  if (NULL == (*pRequest)->msgBuf) {
624,629,445✔
588
    code = terrno;
×
589
    goto _return;
×
590
  }
591
  (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
624,635,171✔
592
  TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
624,634,885✔
593
  TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
624,634,200✔
594

595
  return TSDB_CODE_SUCCESS;
624,633,926✔
596
_return:
×
597
  if ((*pRequest)->pTscObj) {
×
598
    doDestroyRequest(*pRequest);
×
599
  } else {
600
    taosMemoryFree(*pRequest);
×
601
  }
602
  return code;
×
603
}
604

605
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
661,146,023✔
606
  taosMemoryFreeClear(pResInfo->pRspMsg);
661,146,023✔
607
  taosMemoryFreeClear(pResInfo->length);
661,149,748✔
608
  taosMemoryFreeClear(pResInfo->row);
661,144,103✔
609
  taosMemoryFreeClear(pResInfo->pCol);
661,144,250✔
610
  taosMemoryFreeClear(pResInfo->fields);
661,145,201✔
611
  taosMemoryFreeClear(pResInfo->userFields);
661,146,949✔
612
  taosMemoryFreeClear(pResInfo->convertJson);
661,142,724✔
613
  taosMemoryFreeClear(pResInfo->decompBuf);
661,142,264✔
614

615
  if (pResInfo->convertBuf != NULL) {
661,142,882✔
616
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
606,365,945✔
617
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
496,084,355✔
618
    }
619
    taosMemoryFreeClear(pResInfo->convertBuf);
110,283,030✔
620
  }
621
}
661,142,902✔
622

623
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
2,147,483,647✔
624

625
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
2,147,483,647✔
626

627
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
624,442,951✔
628

629
/// return the most previous req ref id
630
int64_t removeFromMostPrevReq(SRequestObj *pRequest) {
624,444,849✔
631
  int64_t      mostPrevReqRefId = pRequest->self;
624,444,849✔
632
  SRequestObj *pTmp = pRequest;
624,447,985✔
633
  while (pTmp->relation.prevRefId) {
624,447,985✔
634
    pTmp = acquireRequest(pTmp->relation.prevRefId);
×
635
    if (pTmp) {
×
636
      mostPrevReqRefId = pTmp->self;
×
637
      (void)releaseRequest(mostPrevReqRefId);  // ignore error
×
638
    } else {
639
      break;
×
640
    }
641
  }
642
  (void)removeRequest(mostPrevReqRefId);  // ignore error
624,446,120✔
643
  return mostPrevReqRefId;
624,447,181✔
644
}
645

646
void destroyNextReq(int64_t nextRefId) {
624,434,307✔
647
  if (nextRefId) {
624,434,307✔
648
    SRequestObj *pObj = acquireRequest(nextRefId);
×
649
    if (pObj) {
×
650
      (void)releaseRequest(nextRefId);  // ignore error
×
651
      (void)releaseRequest(nextRefId);  // ignore error
×
652
    }
653
  }
654
}
624,434,307✔
655

656
void destroySubRequests(SRequestObj *pRequest) {
×
657
  int32_t      reqIdx = -1;
×
658
  SRequestObj *pReqList[16] = {NULL};
×
659
  uint64_t     tmpRefId = 0;
×
660

661
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
×
662
    return;
×
663
  }
664

665
  SRequestObj *pTmp = pRequest;
×
666
  while (pTmp->relation.prevRefId) {
×
667
    tmpRefId = pTmp->relation.prevRefId;
×
668
    pTmp = acquireRequest(tmpRefId);
×
669
    if (pTmp) {
×
670
      pReqList[++reqIdx] = pTmp;
×
671
      (void)releaseRequest(tmpRefId);  // ignore error
×
672
    } else {
673
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
674
      break;
×
675
    }
676
  }
677

678
  for (int32_t i = reqIdx; i >= 0; i--) {
×
679
    (void)removeRequest(pReqList[i]->self);  // ignore error
×
680
  }
681

682
  tmpRefId = pRequest->relation.nextRefId;
×
683
  while (tmpRefId) {
×
684
    pTmp = acquireRequest(tmpRefId);
×
685
    if (pTmp) {
×
686
      tmpRefId = pTmp->relation.nextRefId;
×
687
      (void)removeRequest(pTmp->self);   // ignore error
×
688
      (void)releaseRequest(pTmp->self);  // ignore error
×
689
    } else {
690
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
691
      break;
×
692
    }
693
  }
694
}
695

696
void doDestroyRequest(void *p) {
624,445,939✔
697
  if (NULL == p) {
624,445,939✔
698
    return;
×
699
  }
700

701
  SRequestObj *pRequest = (SRequestObj *)p;
624,445,939✔
702

703
  uint64_t reqId = pRequest->requestId;
624,445,939✔
704
  tscTrace("QID:0x%" PRIx64 ", begin destroy request, res:%p", reqId, pRequest);
624,449,604✔
705

706
  int64_t nextReqRefId = pRequest->relation.nextRefId;
624,449,604✔
707

708
  int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
624,445,281✔
709
  if (TSDB_CODE_SUCCESS != code) {
624,448,803✔
710
    tscDebug("failed to remove request from hash since %s", tstrerror(code));
3,245,895✔
711
  }
712
  schedulerFreeJob(&pRequest->body.queryJob, 0);
624,448,803✔
713

714
  destorySqlCallbackWrapper(pRequest->pWrapper);
624,449,920✔
715

716
  taosMemoryFreeClear(pRequest->msgBuf);
624,443,697✔
717

718
  doFreeReqResultInfo(&pRequest->body.resInfo);
624,435,391✔
719
  if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
624,435,577✔
720
    tscError("failed to destroy semaphore");
×
721
  }
722

723
  taosArrayDestroy(pRequest->tableList);
624,442,688✔
724
  taosArrayDestroy(pRequest->targetTableList);
624,438,334✔
725
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
624,443,659✔
726

727
  if (pRequest->self) {
624,439,509✔
728
    deregisterRequest(pRequest);
624,441,895✔
729
  }
730

731
  taosMemoryFreeClear(pRequest->pDb);
624,443,989✔
732
  taosArrayDestroy(pRequest->dbList);
624,434,279✔
733
  if (pRequest->body.interParam) {
624,442,109✔
734
    if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
624,447,378✔
735
      tscError("failed to destroy semaphore in pRequest");
×
736
    }
737
  }
738
  taosMemoryFree(pRequest->body.interParam);
624,439,953✔
739

740
  qDestroyQuery(pRequest->pQuery);
624,440,273✔
741
  nodesDestroyAllocator(pRequest->allocatorRefId);
624,440,570✔
742

743
  taosMemoryFreeClear(pRequest->effectiveUser);
624,446,921✔
744
  taosMemoryFreeClear(pRequest->sqlstr);
624,446,467✔
745
  taosMemoryFree(pRequest);
624,435,452✔
746
  tscTrace("QID:0x%" PRIx64 ", end destroy request, res:%p", reqId, pRequest);
624,437,353✔
747
  destroyNextReq(nextReqRefId);
624,437,353✔
748
}
749

750
void destroyRequest(SRequestObj *pRequest) {
624,446,827✔
751
  if (pRequest == NULL) return;
624,446,827✔
752

753
  taos_stop_query(pRequest);
624,446,827✔
754
  (void)removeFromMostPrevReq(pRequest);
624,446,508✔
755
}
756

757
void taosStopQueryImpl(SRequestObj *pRequest) {
624,445,414✔
758
  pRequest->killed = true;
624,445,414✔
759

760
  // It is not a query, no need to stop.
761
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
624,449,139✔
762
    tscDebug("QID:0x%" PRIx64 ", no need to be killed since not query", pRequest->requestId);
46,442,304✔
763
    return;
46,443,196✔
764
  }
765

766
  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
578,004,647✔
767
  tscDebug("QID:0x%" PRIx64 ", killed", pRequest->requestId);
578,003,804✔
768
}
769

770
void stopAllQueries(SRequestObj *pRequest) {
624,442,291✔
771
  int32_t      reqIdx = -1;
624,442,291✔
772
  SRequestObj *pReqList[16] = {NULL};
624,442,291✔
773
  uint64_t     tmpRefId = 0;
624,443,130✔
774

775
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
624,443,130✔
776
    return;
×
777
  }
778

779
  SRequestObj *pTmp = pRequest;
624,445,904✔
780
  while (pTmp->relation.prevRefId) {
624,445,904✔
781
    tmpRefId = pTmp->relation.prevRefId;
×
782
    pTmp = acquireRequest(tmpRefId);
×
783
    if (pTmp) {
×
784
      pReqList[++reqIdx] = pTmp;
×
785
    } else {
786
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
787
      break;
×
788
    }
789
  }
790

791
  for (int32_t i = reqIdx; i >= 0; i--) {
624,445,310✔
792
    taosStopQueryImpl(pReqList[i]);
×
793
    (void)releaseRequest(pReqList[i]->self);  // ignore error
×
794
  }
795

796
  taosStopQueryImpl(pRequest);
624,445,310✔
797

798
  tmpRefId = pRequest->relation.nextRefId;
624,446,719✔
799
  while (tmpRefId) {
624,446,401✔
800
    pTmp = acquireRequest(tmpRefId);
×
801
    if (pTmp) {
×
802
      tmpRefId = pTmp->relation.nextRefId;
×
803
      taosStopQueryImpl(pTmp);
×
804
      (void)releaseRequest(pTmp->self);  // ignore error
×
805
    } else {
806
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
807
      break;
×
808
    }
809
  }
810
}
811
#ifdef USE_REPORT
812
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
×
813

814
static void *tscCrashReportThreadFp(void *param) {
×
815
  int32_t code = 0;
×
816
  setThreadName("client-crashReport");
×
817
  char filepath[PATH_MAX] = {0};
×
818
  (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
×
819
  char     *pMsg = NULL;
×
820
  int64_t   msgLen = 0;
×
821
  TdFilePtr pFile = NULL;
×
822
  bool      truncateFile = false;
×
823
  int32_t   sleepTime = 200;
×
824
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
825
  int32_t   loopTimes = reportPeriodNum;
×
826

827
#ifdef WINDOWS
828
  if (taosCheckCurrentInDll()) {
829
    atexit(crashReportThreadFuncUnexpectedStopped);
830
  }
831
#endif
832

833
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
×
834
    return NULL;
×
835
  }
836
  STelemAddrMgmt mgt;
×
837
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
838
  if (code) {
×
839
    tscError("failed to init telemetry management, code:%s", tstrerror(code));
×
840
    return NULL;
×
841
  }
842

843
  code = initCrashLogWriter();
×
844
  if (code) {
×
845
    tscError("failed to init crash log writer, code:%s", tstrerror(code));
×
846
    return NULL;
×
847
  }
848

849
  while (1) {
850
    checkAndPrepareCrashInfo();
×
851
    if (clientStop > 0 && reportThreadSetQuit()) break;
×
852
    if (loopTimes++ < reportPeriodNum) {
×
853
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
854
      taosMsleep(sleepTime);
×
855
      continue;
×
856
    }
857

858
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
859
    if (pMsg && msgLen > 0) {
×
860
      if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
861
        tscError("failed to send crash report");
×
862
        if (pFile) {
×
863
          taosReleaseCrashLogFile(pFile, false);
×
864
          pFile = NULL;
×
865

866
          taosMsleep(sleepTime);
×
867
          loopTimes = 0;
×
868
          continue;
×
869
        }
870
      } else {
871
        tscInfo("succeed to send crash report");
×
872
        truncateFile = true;
×
873
      }
874
    } else {
875
      tscInfo("no crash info was found");
×
876
    }
877

878
    taosMemoryFree(pMsg);
×
879

880
    if (pMsg && msgLen > 0) {
×
881
      pMsg = NULL;
×
882
      continue;
×
883
    }
884

885
    if (pFile) {
×
886
      taosReleaseCrashLogFile(pFile, truncateFile);
×
887
      pFile = NULL;
×
888
      truncateFile = false;
×
889
    }
890

891
    taosMsleep(sleepTime);
×
892
    loopTimes = 0;
×
893
  }
894
  taosTelemetryDestroy(&mgt);
×
895

896
  clientStop = -2;
×
897
  return NULL;
×
898
}
899

900
int32_t tscCrashReportInit() {
1,281,736✔
901
  if (!tsEnableCrashReport) {
1,281,736✔
902
    return TSDB_CODE_SUCCESS;
1,281,736✔
903
  }
904
  int32_t      code = TSDB_CODE_SUCCESS;
×
905
  TdThreadAttr thAttr;
×
906
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
×
907
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
×
908
  TdThread crashReportThread;
×
909
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
×
910
    tscError("failed to create crashReport thread since %s", strerror(ERRNO));
×
911
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
912
    TSC_ERR_RET(terrno);
×
913
  }
914

915
  (void)taosThreadAttrDestroy(&thAttr);
×
916
_return:
×
917
  if (code) {
×
918
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
919
    TSC_ERR_RET(terrno);
×
920
  }
921

922
  return code;
×
923
}
924

925
void tscStopCrashReport() {
1,281,998✔
926
  if (!tsEnableCrashReport) {
1,281,998✔
927
    return;
1,281,998✔
928
  }
929

930
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
×
931
    tscDebug("crash report thread already stopped");
×
932
    return;
×
933
  }
934

935
  while (atomic_load_32(&clientStop) > 0) {
×
936
    taosMsleep(100);
×
937
  }
938
}
939

940
void taos_write_crashinfo(int signum, void *sigInfo, void *context) {
×
941
  writeCrashLogToFile(signum, sigInfo, CUS_PROMPT, lastClusterId, appInfo.startTime);
×
942
}
×
943
#endif
944

945
#ifdef TAOSD_INTEGRATED
946
typedef struct {
947
  TdThread pid;
948
  int32_t  stat;  // < 0: start failed, 0: init(not start), 1: start successfully
949
} SDaemonObj;
950

951
extern int  dmStartDaemon(int argc, char const *argv[]);
952
extern void dmStopDaemon();
953

954
SDaemonObj daemonObj = {0};
955

956
typedef struct {
957
  int32_t argc;
958
  char  **argv;
959
} SExecArgs;
960

961
static void *dmStartDaemonFunc(void *param) {
962
  int32_t    code = 0;
963
  SExecArgs *pArgs = (SExecArgs *)param;
964
  int32_t    argc = pArgs->argc;
965
  char     **argv = pArgs->argv;
966

967
  code = dmStartDaemon(argc, (const char **)argv);
968
  if (code != 0) {
969
    printf("failed to start taosd since %s\r\n", tstrerror(code));
970
    goto _exit;
971
  }
972

973
_exit:
974
  if (code != 0) {
975
    atomic_store_32(&daemonObj.stat, code);
976
  }
977
  return NULL;
978
}
979

980
static int32_t shellStartDaemon(int argc, char *argv[]) {
981
  int32_t    code = 0, lino = 0;
982
  SExecArgs *pArgs = NULL;
983
  int64_t    startMs = taosGetTimestampMs(), endMs = startMs;
984

985
  TdThreadAttr thAttr;
986
  (void)taosThreadAttrInit(&thAttr);
987
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
988
#ifdef TD_COMPACT_OS
989
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
990
#endif
991
  pArgs = (SExecArgs *)taosMemoryCalloc(1, sizeof(SExecArgs));
992
  if (pArgs == NULL) {
993
    code = terrno;
994
    TAOS_CHECK_EXIT(code);
995
  }
996
  pArgs->argc = argc;
997
  pArgs->argv = argv;
998

999
#ifndef TD_AS_LIB
1000
  tsLogEmbedded = 1;
1001
#endif
1002

1003
  TAOS_CHECK_EXIT(taosThreadCreate(&daemonObj.pid, &thAttr, dmStartDaemonFunc, pArgs));
1004

1005
  while (true) {
1006
    if (atomic_load_64(&tsDndStart)) {
1007
      atomic_store_32(&daemonObj.stat, 1);
1008
      break;
1009
    }
1010
    int32_t daemonstat = atomic_load_32(&daemonObj.stat);
1011
    if (daemonstat < 0) {
1012
      code = daemonstat;
1013
      TAOS_CHECK_EXIT(code);
1014
    }
1015

1016
    if (daemonstat > 1) {
1017
      code = TSDB_CODE_APP_ERROR;
1018
      TAOS_CHECK_EXIT(code);
1019
    }
1020
    taosMsleep(1000);
1021
  }
1022

1023
_exit:
1024
  endMs = taosGetTimestampMs();
1025
  (void)taosThreadAttrDestroy(&thAttr);
1026
  taosMemoryFreeClear(pArgs);
1027
  if (code) {
1028
    printf("\r\n The daemon start failed at line %d since %s, cost %" PRIi64 " ms\r\n", lino, tstrerror(code),
1029
           endMs - startMs);
1030
  } else {
1031
    printf("\r\n The daemon started successfully, cost %" PRIi64 " ms\r\n", endMs - startMs);
1032
  }
1033
#ifndef TD_AS_LIB
1034
  tsLogEmbedded = 0;
1035
#endif
1036
  return code;
1037
}
1038

1039
void shellStopDaemon() {
1040
#ifndef TD_AS_LIB
1041
  tsLogEmbedded = 1;
1042
#endif
1043
  dmStopDaemon();
1044
  if (taosCheckPthreadValid(daemonObj.pid)) {
1045
    (void)taosThreadJoin(daemonObj.pid, NULL);
1046
    taosThreadClear(&daemonObj.pid);
1047
  }
1048
}
1049
#endif
1050

1051
void taos_init_imp(void) {
1,281,998✔
1052
#if defined(LINUX)
1053
  if (tscDbg.memEnable) {
1,281,998✔
1054
    int32_t code = taosMemoryDbgInit();
×
1055
    if (code) {
×
1056
      (void)printf("failed to init memory dbg, error:%s\n", tstrerror(code));
×
1057
    } else {
1058
      tsAsyncLog = false;
×
1059
      (void)printf("memory dbg enabled\n");
×
1060
    }
1061
  }
1062
#endif
1063

1064
  // In the APIs of other program language, taos_cleanup is not available yet.
1065
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
1066
  (void)atexit(taos_cleanup);
1,281,998✔
1067
  SET_ERRNO(TSDB_CODE_SUCCESS);
1,281,998✔
1068
  terrno = TSDB_CODE_SUCCESS;
1,281,998✔
1069
  taosSeedRand(taosGetTimestampSec());
1,281,998✔
1070

1071
  appInfo.pid = taosGetPId();
1,281,998✔
1072
  appInfo.startTime = taosGetTimestampMs();
1,281,998✔
1073
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,281,998✔
1074
  appInfo.pInstMapByClusterId =
1,281,998✔
1075
      taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1,281,998✔
1076
  if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
1,281,998✔
1077
    (void)printf("failed to allocate memory when init appInfo\n");
×
1078
    tscInitRes = terrno;
×
1079
    return;
×
1080
  }
1081
  taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
1,281,998✔
1082

1083
  const char *logName = CUS_PROMPT "log";
1,281,998✔
1084
  ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
1,281,998✔
1085
  if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
1,281,998✔
1086
    (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(ERRNO), configDir);
262✔
1087
    tscInitRes = terrno;
262✔
1088
    return;
262✔
1089
  }
1090

1091
#ifdef TAOSD_INTEGRATED
1092
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0), "failed to init cfg");
1093
#else
1094
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
1,281,736✔
1095
#endif
1096

1097
  initQueryModuleMsgHandle();
1,281,736✔
1098
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
1099
  if ((tsCharsetCxt = taosConvInit(tsCharset)) == NULL) {
1,281,736✔
1100
    tscInitRes = terrno;
×
1101
    tscError("failed to init conv");
×
1102
    return;
×
1103
  }
1104
#endif
1105
#if !defined(WINDOWS) && !defined(TD_ASTRA)
1106
  ENV_ERR_RET(tzInit(), "failed to init timezone");
1,281,736✔
1107
#endif
1108
  ENV_ERR_RET(monitorInit(), "failed to init monitor");
1,281,736✔
1109
  ENV_ERR_RET(rpcInit(), "failed to init rpc");
1,281,736✔
1110

1111
  if (InitRegexCache() != 0) {
1,281,736✔
1112
    tscInitRes = terrno;
×
1113
    (void)printf("failed to init regex cache\n");
×
1114
    return;
×
1115
  }
1116

1117
  tscInfo("starting to initialize TAOS driver");
1,281,736✔
1118

1119
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
1,281,736✔
1120
  ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
1,281,736✔
1121
  ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
1,281,736✔
1122
  ENV_ERR_RET(initClientId(), "failed to init clientId");
1,281,736✔
1123

1124
  ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
1,281,736✔
1125
  ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
1,281,736✔
1126
  ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
1,281,736✔
1127

1128
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
1,281,736✔
1129
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
1,281,736✔
1130

1131
  ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
1,281,736✔
1132
  ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
1,281,736✔
1133
#ifdef USE_REPORT
1134
  ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
1,281,736✔
1135
#endif
1136
  ENV_ERR_RET(qInitKeywordsTable(), "failed to init parser keywords table");
1,281,736✔
1137
#ifdef TAOSD_INTEGRATED
1138
  ENV_ERR_RET(shellStartDaemon(0, NULL), "failed to start taosd daemon");
1139
#endif
1140
  tscInfo("TAOS driver is initialized successfully");
1,281,736✔
1141
}
1142

1143
int taos_init() {
2,021,799✔
1144
  (void)taosThreadOnce(&tscinit, taos_init_imp);
2,021,799✔
1145
  return tscInitRes;
2,021,799✔
1146
}
1147

1148
const char *getCfgName(TSDB_OPTION option) {
9,725✔
1149
  const char *name = NULL;
9,725✔
1150

1151
  switch (option) {
9,725✔
1152
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
8,454✔
1153
      name = "shellActivityTimer";
8,454✔
1154
      break;
8,454✔
1155
    case TSDB_OPTION_LOCALE:
×
1156
      name = "locale";
×
1157
      break;
×
1158
    case TSDB_OPTION_CHARSET:
×
1159
      name = "charset";
×
1160
      break;
×
1161
    case TSDB_OPTION_TIMEZONE:
1,271✔
1162
      name = "timezone";
1,271✔
1163
      break;
1,271✔
1164
    case TSDB_OPTION_USE_ADAPTER:
×
1165
      name = "useAdapter";
×
1166
      break;
×
1167
    default:
×
1168
      break;
×
1169
  }
1170

1171
  return name;
9,725✔
1172
}
1173

1174
int taos_options_imp(TSDB_OPTION option, const char *str) {
602,796✔
1175
  if (option == TSDB_OPTION_CONFIGDIR) {
602,796✔
1176
#ifndef WINDOWS
1177
    char newstr[PATH_MAX];
536,470✔
1178
    int  len = strlen(str);
593,071✔
1179
    if (len > 1 && str[0] != '"' && str[0] != '\'') {
593,071✔
1180
      if (len + 2 >= PATH_MAX) {
592,836✔
1181
        tscError("Too long path %s", str);
×
1182
        return -1;
×
1183
      }
1184
      newstr[0] = '"';
592,836✔
1185
      (void)memcpy(newstr + 1, str, len);
592,836✔
1186
      newstr[len + 1] = '"';
592,836✔
1187
      newstr[len + 2] = '\0';
592,836✔
1188
      str = newstr;
592,836✔
1189
    }
1190
#endif
1191
    tstrncpy(configDir, str, PATH_MAX);
593,071✔
1192
    tscInfo("set cfg:%s to %s", configDir, str);
593,071✔
1193
    return 0;
593,071✔
1194
  }
1195

1196
  // initialize global config
1197
  if (taos_init() != 0) {
9,725✔
1198
    return -1;
×
1199
  }
1200

1201
  SConfig     *pCfg = taosGetCfg();
9,725✔
1202
  SConfigItem *pItem = NULL;
9,725✔
1203
  const char  *name = getCfgName(option);
9,725✔
1204

1205
  if (name == NULL) {
9,725✔
1206
    tscError("Invalid option %d", option);
×
1207
    return -1;
×
1208
  }
1209

1210
  pItem = cfgGetItem(pCfg, name);
9,725✔
1211
  if (pItem == NULL) {
9,725✔
1212
    tscError("Invalid option %d", option);
×
1213
    return -1;
×
1214
  }
1215

1216
  int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
9,725✔
1217
  if (code != 0) {
9,725✔
1218
    tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
8,454✔
1219
  } else {
1220
    tscInfo("set cfg:%s to %s", name, str);
1,271✔
1221
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
1,271✔
1222
      code = taosCfgDynamicOptions(pCfg, name, false);
×
1223
    }
1224
  }
1225

1226
  return code;
9,725✔
1227
}
1228

1229
/**
1230
 * The request id is an unsigned integer format of 64bit.
1231
 *+------------+-----+-----------+---------------+
1232
 *| uid|localIp| PId | timestamp | serial number |
1233
 *+------------+-----+-----------+---------------+
1234
 *| 12bit      |12bit|24bit      |16bit          |
1235
 *+------------+-----+-----------+---------------+
1236
 * @return
1237
 */
1238
uint64_t generateRequestId() {
653,207,921✔
1239
  static uint32_t hashId = 0;
1240
  static int32_t  requestSerialId = 0;
1241

1242
  if (hashId == 0) {
653,207,921✔
1243
    int32_t code = taosGetSystemUUIDU32(&hashId);
1,280,072✔
1244
    if (code != TSDB_CODE_SUCCESS) {
1,280,072✔
1245
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
×
1246
               tstrerror(code));
1247
    }
1248
  }
1249

1250
  uint64_t id = 0;
653,206,882✔
1251

1252
  while (true) {
×
1253
    int64_t  ts = taosGetTimestampMs();
653,215,575✔
1254
    uint64_t pid = taosGetPId();
653,215,575✔
1255
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
653,207,606✔
1256
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
653,211,513✔
1257

1258
    id = (((uint64_t)(hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
653,211,925✔
1259
    if (id) {
653,211,925✔
1260
      break;
653,211,925✔
1261
    }
1262
  }
1263
  return id;
653,211,925✔
1264
}
1265

1266
#if 0
1267
#include "cJSON.h"
1268
static setConfRet taos_set_config_imp(const char *config){
1269
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
1270
  static bool setConfFlag = false;
1271
  if (setConfFlag) {
1272
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
1273
    tstrncpy(ret.retMsg, "configuration can only set once", RET_MSG_LENGTH);
1274
    return ret;
1275
  }
1276
  taosInitGlobalCfg();
1277
  cJSON *root = cJSON_Parse(config);
1278
  if (root == NULL){
1279
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
1280
    tstrncpy(ret.retMsg, "parse json error", RET_MSG_LENGTH);
1281
    return ret;
1282
  }
1283

1284
  int size = cJSON_GetArraySize(root);
1285
  if(!cJSON_IsObject(root) || size == 0) {
1286
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
1287
    tstrncpy(ret.retMsg, "json content is invalid, must be not empty object", RET_MSG_LENGTH);
1288
    return ret;
1289
  }
1290

1291
  if(size >= 1000) {
1292
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
1293
    tstrncpy(ret.retMsg, "json object size is too long", RET_MSG_LENGTH);
1294
    return ret;
1295
  }
1296

1297
  for(int i = 0; i < size; i++){
1298
    cJSON *item = cJSON_GetArrayItem(root, i);
1299
    if(!item) {
1300
      ret.retCode = SET_CONF_RET_ERR_INNER;
1301
      tstrncpy(ret.retMsg, "inner error", RET_MSG_LENGTH);
1302
      return ret;
1303
    }
1304
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
1305
      ret.retCode = SET_CONF_RET_ERR_PART;
1306
      if (strlen(ret.retMsg) == 0){
1307
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
1308
      }else{
1309
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1310
        size_t leftSize = tmp >= 0 ? tmp : 0;
1311
        strncat(ret.retMsg, "|",  leftSize);
1312
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1313
        leftSize = tmp >= 0 ? tmp : 0;
1314
        strncat(ret.retMsg, item->string, leftSize);
1315
      }
1316
    }
1317
  }
1318
  cJSON_Delete(root);
1319
  setConfFlag = true;
1320
  return ret;
1321
}
1322

1323
setConfRet taos_set_config(const char *config){
1324
  taosThreadMutexLock(&setConfMutex);
1325
  setConfRet ret = taos_set_config_imp(config);
1326
  taosThreadMutexUnlock(&setConfMutex);
1327
  return ret;
1328
}
1329
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc