• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4876

10 Dec 2025 05:56AM UTC coverage: 64.632% (+0.2%) from 64.472%
#4876

push

travis-ci

guanshengliang
test: fix idmp case with checkDataMemLoop checked (#33862)

4 of 9 new or added lines in 3 files covered. (44.44%)

380 existing lines in 104 files now uncovered.

162866 of 251990 relevant lines covered (64.63%)

107950382.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.15
/source/client/src/clientEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ttimer.h>
17
#include "cJSON.h"
18
#include "catalog.h"
19
#include "clientInt.h"
20
#include "clientLog.h"
21
#include "clientMonitor.h"
22
#include "functionMgt.h"
23
#include "os.h"
24
#include "osSleep.h"
25
#include "query.h"
26
#include "qworker.h"
27
#include "scheduler.h"
28
#include "tcache.h"
29
#include "tcompare.h"
30
#include "tconv.h"
31
#include "tglobal.h"
32
#include "thttp.h"
33
#include "tmsg.h"
34
#include "tqueue.h"
35
#include "tref.h"
36
#include "trpc.h"
37
#include "tsched.h"
38
#include "ttime.h"
39
#include "tversion.h"
40

41
#include "cus_name.h"
42

43
#define TSC_VAR_NOT_RELEASE 1
44
#define TSC_VAR_RELEASED    0
45

46
#define ENV_JSON_FALSE_CHECK(c)                     \
47
  do {                                              \
48
    if (!c) {                                       \
49
      tscError("faild to add item to JSON object"); \
50
      code = TSDB_CODE_TSC_FAIL_GENERATE_JSON;      \
51
      goto _end;                                    \
52
    }                                               \
53
  } while (0)
54

55
#define ENV_ERR_RET(c, info)          \
56
  do {                                \
57
    int32_t _code = (c);              \
58
    if (_code != TSDB_CODE_SUCCESS) { \
59
      terrno = _code;                 \
60
      tscInitRes = _code;             \
61
      tscError(info);                 \
62
      return;                         \
63
    }                                 \
64
  } while (0)
65

66
STscDbg   tscDbg = {0};
67
SAppInfo  appInfo;
68
int64_t   lastClusterId = 0;
69
int32_t   clientReqRefPool = -1;
70
int32_t   clientConnRefPool = -1;
71
int32_t   clientStop = -1;
72
SHashObj *pTimezoneMap = NULL;
73

74
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
75
volatile int32_t    tscInitRes = 0;
76

77
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
662,707,103✔
78
  int32_t code = TSDB_CODE_SUCCESS;
662,707,103✔
79
  // connection has been released already, abort creating request.
80
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
662,707,103✔
81
  if (pRequest->self < 0) {
662,716,932✔
82
    tscError("failed to add ref to request");
×
83
    code = terrno;
×
84
    return code;
×
85
  }
86

87
  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
662,709,196✔
88

89
  if (pTscObj->pAppInfo) {
662,714,701✔
90
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
662,715,994✔
91

92
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
662,712,714✔
93
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
662,714,924✔
94
    tscDebug("req:0x%" PRIx64 ", create request from conn:0x%" PRIx64
662,715,644✔
95
             ", current:%d, app current:%d, total:%d, QID:0x%" PRIx64,
96
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
97
  }
98

99
  return code;
662,709,642✔
100
}
101

102
static void concatStrings(SArray *list, char *buf, int size) {
×
103
  int len = 0;
×
104
  for (int i = 0; i < taosArrayGetSize(list); i++) {
×
105
    char *db = taosArrayGet(list, i);
×
106
    if (NULL == db) {
×
107
      tscError("get dbname failed, buf:%s", buf);
×
108
      break;
×
109
    }
110
    char *dot = strchr(db, '.');
×
111
    if (dot != NULL) {
×
112
      db = dot + 1;
×
113
    }
114
    if (i != 0) {
×
115
      (void)strncat(buf, ",", size - 1 - len);
×
116
      len += 1;
×
117
    }
118
    int ret = tsnprintf(buf + len, size - len, "%s", db);
×
119
    if (ret < 0) {
×
120
      tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
×
121
      break;
×
122
    }
123
    len += ret;
×
124
    if (len >= size) {
×
125
      tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
×
126
      break;
×
127
    }
128
  }
129
}
×
130
#ifdef USE_REPORT
131
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration) {
×
132
  cJSON  *json = cJSON_CreateObject();
×
133
  int32_t code = TSDB_CODE_SUCCESS;
×
134
  if (json == NULL) {
×
135
    tscError("failed to create monitor json");
×
136
    return TSDB_CODE_OUT_OF_MEMORY;
×
137
  }
138
  char clusterId[32] = {0};
×
139
  if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0) {
×
140
    tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
×
141
    code = TSDB_CODE_FAILED;
×
142
    goto _end;
×
143
  }
144

145
  char startTs[32] = {0};
×
146
  if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start / 1000) < 0) {
×
147
    tscError("failed to generate startTs:%" PRId64, pRequest->metric.start / 1000);
×
148
    code = TSDB_CODE_FAILED;
×
149
    goto _end;
×
150
  }
151

152
  char requestId[32] = {0};
×
153
  if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0) {
×
154
    tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
×
155
    code = TSDB_CODE_FAILED;
×
156
    goto _end;
×
157
  }
158
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
×
159
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
×
160
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
×
161
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration / 1000)));
×
162
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
×
163
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
×
164
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
×
165
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
×
166
      json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
167
  if (pRequest->sqlstr != NULL &&
×
168
      strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
×
169
    char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
×
170
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
×
171
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
172
    pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
×
173
  } else {
174
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
×
175
  }
176

177
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
×
178
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
×
179
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
×
180

181
  char pid[32] = {0};
×
182
  if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0) {
×
183
    tscError("failed to generate pid:%d", appInfo.pid);
×
184
    code = TSDB_CODE_FAILED;
×
185
    goto _end;
×
186
  }
187

188
  ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
×
189
  if (pRequest->dbList != NULL) {
×
190
    char dbList[1024] = {0};
×
191
    concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
×
192
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
×
193
  } else if (pRequest->pDb != NULL) {
×
194
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
×
195
  } else {
196
    ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
×
197
  }
198

199
  char *value = cJSON_PrintUnformatted(json);
×
200
  if (value == NULL) {
×
201
    tscError("failed to print json");
×
202
    code = TSDB_CODE_FAILED;
×
203
    goto _end;
×
204
  }
205
  MonitorSlowLogData data = {0};
×
206
  data.clusterId = pTscObj->pAppInfo->clusterId;
×
207
  data.type = SLOW_LOG_WRITE;
×
208
  data.data = value;
×
209
  code = monitorPutData2MonitorQueue(data);
×
210
  if (TSDB_CODE_SUCCESS != code) {
×
211
    taosMemoryFree(value);
×
212
    goto _end;
×
213
  }
214

215
_end:
×
216
  cJSON_Delete(json);
×
217
  return code;
×
218
}
219
#endif
220
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char *exceptDb) {
451,454✔
221
  if (pRequest->pDb != NULL) {
451,454✔
222
    return strcmp(pRequest->pDb, exceptDb) != 0;
305,897✔
223
  }
224

225
  for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
230,336✔
226
    char *db = taosArrayGet(pRequest->dbList, i);
84,992✔
227
    if (NULL == db) {
84,992✔
228
      tscError("get dbname failed, exceptDb:%s", exceptDb);
×
229
      return false;
×
230
    }
231
    char *dot = strchr(db, '.');
84,992✔
232
    if (dot != NULL) {
84,992✔
233
      db = dot + 1;
84,992✔
234
    }
235
    if (strcmp(db, exceptDb) == 0) {
84,992✔
236
      return false;
×
237
    }
238
  }
239
  return true;
145,344✔
240
}
241

242
static void deregisterRequest(SRequestObj *pRequest) {
662,504,626✔
243
  if (pRequest == NULL) {
662,504,626✔
244
    tscError("pRequest == NULL");
×
245
    return;
×
246
  }
247

248
  STscObj            *pTscObj = pRequest->pTscObj;
662,504,626✔
249
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
662,506,090✔
250

251
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
662,508,452✔
252
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
662,512,273✔
253
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
662,512,089✔
254

255
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
662,509,821✔
256
  tscDebug("req:0x%" PRIx64 ", free from conn:0x%" PRIx64 ", QID:0x%" PRIx64
662,506,856✔
257
           ", elapsed:%.2f ms, current:%d, app current:%d",
258
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
259

260
  if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
662,506,856✔
261
    if ((pRequest->pQuery && pRequest->pQuery->pRoot && QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
662,505,729✔
262
         (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
522,169,348✔
263
        QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
191,917,469✔
264
      tscDebug("req:0x%" PRIx64 ", insert duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
523,177,301✔
265
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
266
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
267
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
268
               pRequest->requestId);
269
      (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
523,177,301✔
270
      reqType = SLOW_LOG_TYPE_INSERT;
523,180,233✔
271
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
139,331,526✔
272
      tscDebug("req:0x%" PRIx64 ", query duration:%" PRId64 "us, parseCost:%" PRId64 "us, ctgCost:%" PRId64
95,273,207✔
273
               "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us, QID:0x%" PRIx64,
274
               pRequest->self, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs,
275
               pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs,
276
               pRequest->requestId);
277

278
      (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
95,273,207✔
279
      reqType = SLOW_LOG_TYPE_QUERY;
95,273,478✔
280
    }
281

282
    if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
662,511,758✔
283
      tscError("failed to release allocator");
×
284
    }
285
  }
286

287
#ifdef USE_REPORT
288
  if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
662,507,349✔
289
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
3,667,945✔
290
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
3,468,886✔
291
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
199,059✔
292
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
63,166✔
293
    } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
135,893✔
294
      sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
×
295
    }
296
  }
297

298
  if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
662,959,956✔
299
      checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
451,241✔
300
    (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
451,454✔
301
    if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
451,454✔
302
      taosPrintSlowLog("PID:%d, connId:%u, QID:0x%" PRIx64 ", Start:%" PRId64 "us, Duration:%" PRId64 "us, SQL:%s",
223,024✔
303
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
304
                       pRequest->sqlstr);
305
      if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
223,024✔
306
        slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
×
307
        if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
×
308
          tscError("failed to generate write slow log");
×
309
        }
310
      }
311
    }
312
  }
313
#endif
314

315
  releaseTscObj(pTscObj->id);
662,509,405✔
316
}
317

318
// todo close the transporter properly
319
void closeTransporter(SAppInstInfo *pAppInfo) {
1,861,136✔
320
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
1,861,136✔
321
    return;
×
322
  }
323

324
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
1,861,136✔
325
  rpcClose(pAppInfo->pTransporter);
1,861,136✔
326
}
327

328
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
24,627,414✔
329
  if (NEED_REDIRECT_ERROR(code)) {
24,627,414✔
330
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
20,521,247✔
331
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK ||
20,521,493✔
332
        msgType == TDMT_SCH_TASK_NOTIFY) {
333
      return false;
1✔
334
    }
335
    return true;
20,521,282✔
336
  } else if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY || code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE ||
4,106,167✔
337
             code == TSDB_CODE_SYN_WRITE_STALL || code == TSDB_CODE_SYN_PROPOSE_NOT_READY ||
4,106,167✔
338
             code == TSDB_CODE_SYN_RESTORING) {
339
    tscDebug("client msg type %s should retry since %s", TMSG_INFO(msgType), tstrerror(code));
×
340
    return true;
×
341
  } else {
342
    return false;
4,106,167✔
343
  }
344
}
345

346
// start timer for particular msgType
347
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
×
348
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
×
349
    return true;
×
350
  }
351
  return false;
×
352
}
353

354
// TODO refactor
355
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
1,861,188✔
356
  SRpcInit rpcInit;
1,725,574✔
357
  (void)memset(&rpcInit, 0, sizeof(rpcInit));
1,861,188✔
358
  rpcInit.localPort = 0;
1,861,188✔
359
  rpcInit.label = "TSC";
1,861,188✔
360
  rpcInit.numOfThreads = tsNumOfRpcThreads;
1,861,188✔
361
  rpcInit.cfp = processMsgFromServer;
1,861,188✔
362
  rpcInit.rfp = clientRpcRfp;
1,861,188✔
363
  rpcInit.sessions = 1024;
1,861,188✔
364
  rpcInit.connType = TAOS_CONN_CLIENT;
1,861,188✔
365
  rpcInit.user = (char *)user;
1,861,188✔
366
  rpcInit.idleTime = tsShellActivityTimer * 1000;
1,861,188✔
367
  rpcInit.compressSize = tsCompressMsgSize;
1,861,188✔
368
  rpcInit.dfp = destroyAhandle;
1,861,188✔
369

370
  rpcInit.retryMinInterval = tsRedirectPeriod;
1,861,188✔
371
  rpcInit.retryStepFactor = tsRedirectFactor;
1,861,188✔
372
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
1,861,188✔
373
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
1,861,188✔
374

375
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
1,861,188✔
376
  connLimitNum = TMAX(connLimitNum, 10);
1,861,188✔
377
  connLimitNum = TMIN(connLimitNum, 1000);
1,861,188✔
378
  rpcInit.connLimitNum = connLimitNum;
1,861,188✔
379
  rpcInit.shareConnLimit = tsShareConnLimit;
1,861,188✔
380
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
1,861,188✔
381
  rpcInit.startReadTimer = 1;
1,861,188✔
382
  rpcInit.readTimeout = tsReadTimeout;
1,861,188✔
383
  rpcInit.ipv6 = tsEnableIpv6;
1,861,188✔
384
  rpcInit.enableSSL = tsEnableTLS;
1,861,188✔
385

386
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
1,861,188✔
387
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
1,861,188✔
388
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
1,861,188✔
389
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
1,861,188✔
390
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
1,861,188✔
391

392
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
1,861,188✔
393
  if (TSDB_CODE_SUCCESS != code) {
1,861,188✔
394
    tscError("invalid version string.");
×
395
    return code;
×
396
  }
397

398
  tscInfo("rpc max retry timeout %" PRId64 "", rpcInit.retryMaxTimeout);
1,861,188✔
399
  *pDnodeConn = rpcOpen(&rpcInit);
1,861,188✔
400
  if (*pDnodeConn == NULL) {
1,861,188✔
401
    tscError("failed to init connection to server since %s", tstrerror(terrno));
52✔
402
    code = terrno;
52✔
403
  }
404

405
  return code;
1,861,188✔
406
}
407

408
void destroyAllRequests(SHashObj *pRequests) {
3,534,941✔
409
  void *pIter = taosHashIterate(pRequests, NULL);
3,534,941✔
410
  while (pIter != NULL) {
3,534,941✔
411
    int64_t *rid = pIter;
×
412

413
    SRequestObj *pRequest = acquireRequest(*rid);
×
414
    if (pRequest) {
×
415
      destroyRequest(pRequest);
×
416
      (void)releaseRequest(*rid);  // ignore error
×
417
    }
418

419
    pIter = taosHashIterate(pRequests, pIter);
×
420
  }
421
}
3,534,941✔
422

UNCOV
423
void stopAllRequests(SHashObj *pRequests) {
×
UNCOV
424
  void *pIter = taosHashIterate(pRequests, NULL);
×
UNCOV
425
  while (pIter != NULL) {
×
426
    int64_t *rid = pIter;
×
427

428
    SRequestObj *pRequest = acquireRequest(*rid);
×
429
    if (pRequest) {
×
430
      taos_stop_query(pRequest);
×
431
      (void)releaseRequest(*rid);  // ignore error
×
432
    }
433

434
    pIter = taosHashIterate(pRequests, pIter);
×
435
  }
UNCOV
436
}
×
437

438
void destroyAppInst(void *info) {
1,861,136✔
439
  SAppInstInfo *pAppInfo = *(SAppInstInfo **)info;
1,861,136✔
440
  tscInfo("destroy app inst mgr %p", pAppInfo);
1,861,136✔
441

442
  int32_t code = taosThreadMutexLock(&appInfo.mutex);
1,861,136✔
443
  if (TSDB_CODE_SUCCESS != code) {
1,861,136✔
444
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
445
  }
446

447
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
1,861,136✔
448

449
  code = taosThreadMutexUnlock(&appInfo.mutex);
1,861,136✔
450
  if (TSDB_CODE_SUCCESS != code) {
1,861,136✔
451
    tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
452
  }
453

454
  taosMemoryFreeClear(pAppInfo->instKey);
1,861,136✔
455
  closeTransporter(pAppInfo);
1,861,136✔
456

457
  code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
1,861,136✔
458
  if (TSDB_CODE_SUCCESS != code) {
1,861,136✔
459
    tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
460
  }
461

462
  taosArrayDestroy(pAppInfo->pQnodeList);
1,861,136✔
463
  code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
1,861,136✔
464
  if (TSDB_CODE_SUCCESS != code) {
1,861,136✔
465
    tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
466
  }
467

468
  taosMemoryFree(pAppInfo);
1,861,136✔
469
}
1,861,136✔
470

471
void destroyTscObj(void *pObj) {
3,534,823✔
472
  if (NULL == pObj) {
3,534,823✔
473
    return;
×
474
  }
475

476
  STscObj *pTscObj = pObj;
3,534,823✔
477
  int64_t  tscId = pTscObj->id;
3,534,823✔
478
  tscTrace("conn:%" PRIx64 ", begin destroy, p:%p", tscId, pTscObj);
3,534,823✔
479

480
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
3,534,823✔
481
  hbDeregisterConn(pTscObj, connKey);
3,534,823✔
482

483
  destroyAllRequests(pTscObj->pRequests);
3,534,941✔
484
  taosHashCleanup(pTscObj->pRequests);
3,534,941✔
485

486
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
3,534,941✔
487
  tscDebug("conn:0x%" PRIx64 ", p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
3,534,941✔
488
           pTscObj->pAppInfo->numOfConns);
489

490
  // In any cases, we should not free app inst here. Or an race condition rises.
491
  /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
3,534,941✔
492

493
  (void)taosThreadMutexDestroy(&pTscObj->mutex);
3,534,941✔
494
  taosMemoryFree(pTscObj);
3,534,941✔
495

496
  tscTrace("conn:0x%" PRIx64 ", end destroy, p:%p", tscId, pTscObj);
3,534,941✔
497
}
498

499
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
3,790,132✔
500
                     STscObj **pObj) {
501
  *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
3,790,132✔
502
  if (NULL == *pObj) {
3,790,132✔
503
    return terrno;
×
504
  }
505

506
  (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
3,790,132✔
507
  if (NULL == (*pObj)->pRequests) {
3,790,014✔
508
    taosMemoryFree(*pObj);
×
509
    return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
510
  }
511

512
  (*pObj)->connType = connType;
3,790,014✔
513
  (*pObj)->pAppInfo = pAppInfo;
3,790,014✔
514
  (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
3,790,014✔
515
  tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
3,790,014✔
516
  (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
3,790,014✔
517

518
  if (db != NULL) {
3,790,014✔
519
    tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
3,789,433✔
520
  }
521

522
  TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
3,790,014✔
523

524
  int32_t code = TSDB_CODE_SUCCESS;
3,789,756✔
525

526
  (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
3,789,756✔
527
  if ((*pObj)->id < 0) {
3,789,315✔
528
    tscError("failed to add object to clientConnRefPool");
×
529
    code = terrno;
×
530
    taosMemoryFree(*pObj);
×
531
    return code;
×
532
  }
533

534
  (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
3,789,315✔
535

536
  tscInfo("conn:0x%" PRIx64 ", created, p:%p", (*pObj)->id, *pObj);
3,790,132✔
537
  return code;
3,790,132✔
538
}
539

540
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
790,445,915✔
541

542
void releaseTscObj(int64_t rid) {
786,725,286✔
543
  int32_t code = taosReleaseRef(clientConnRefPool, rid);
786,725,286✔
544
  if (TSDB_CODE_SUCCESS != code) {
786,733,626✔
545
    tscWarn("failed to release TscObj, code:%s", tstrerror(code));
×
546
  }
547
}
786,733,626✔
548

549
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
662,705,356✔
550
  int32_t code = TSDB_CODE_SUCCESS;
662,705,356✔
551
  *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
662,705,356✔
552
  if (NULL == *pRequest) {
662,706,631✔
553
    return terrno;
×
554
  }
555

556
  STscObj *pTscObj = acquireTscObj(connId);
662,706,718✔
557
  if (pTscObj == NULL) {
662,715,133✔
558
    TSC_ERR_JRET(TSDB_CODE_TSC_DISCONNECTED);
×
559
  }
560
  SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
662,715,133✔
561
  if (interParam == NULL) {
662,716,628✔
562
    releaseTscObj(connId);
×
563
    TSC_ERR_JRET(terrno);
×
564
  }
565
  TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
662,716,628✔
566
  interParam->pRequest = *pRequest;
662,713,644✔
567
  (*pRequest)->body.interParam = interParam;
662,713,541✔
568

569
  (*pRequest)->resType = RES_TYPE__QUERY;
662,712,062✔
570
  (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
662,711,955✔
571
  (*pRequest)->metric.start = taosGetTimestampUs();
1,299,630,932✔
572

573
  (*pRequest)->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
662,712,606✔
574
  (*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt;
662,713,638✔
575
  (*pRequest)->type = type;
662,713,217✔
576
  (*pRequest)->allocatorRefId = -1;
662,713,386✔
577

578
  (*pRequest)->pDb = getDbOfConnection(pTscObj);
662,712,998✔
579
  if (NULL == (*pRequest)->pDb) {
662,709,512✔
580
    TSC_ERR_JRET(terrno);
73,664,680✔
581
  }
582
  (*pRequest)->pTscObj = pTscObj;
662,709,910✔
583
  (*pRequest)->inCallback = false;
662,709,381✔
584
  (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
662,711,379✔
585
  if (NULL == (*pRequest)->msgBuf) {
662,705,806✔
586
    code = terrno;
×
587
    goto _return;
×
588
  }
589
  (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
662,706,643✔
590
  TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
662,709,869✔
591
  TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
662,707,970✔
592

593
  return TSDB_CODE_SUCCESS;
662,705,611✔
594
_return:
×
595
  if ((*pRequest)->pTscObj) {
×
596
    doDestroyRequest(*pRequest);
×
597
  } else {
598
    taosMemoryFree(*pRequest);
×
599
  }
600
  return code;
×
601
}
602

603
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
702,033,191✔
604
  taosMemoryFreeClear(pResInfo->pRspMsg);
702,033,191✔
605
  taosMemoryFreeClear(pResInfo->length);
702,035,641✔
606
  taosMemoryFreeClear(pResInfo->row);
702,031,453✔
607
  taosMemoryFreeClear(pResInfo->pCol);
702,031,934✔
608
  taosMemoryFreeClear(pResInfo->fields);
702,030,850✔
609
  taosMemoryFreeClear(pResInfo->userFields);
702,034,704✔
610
  taosMemoryFreeClear(pResInfo->convertJson);
702,028,930✔
611
  taosMemoryFreeClear(pResInfo->decompBuf);
702,031,377✔
612

613
  if (pResInfo->convertBuf != NULL) {
702,028,727✔
614
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
642,243,237✔
615
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
525,106,343✔
616
    }
617
    taosMemoryFreeClear(pResInfo->convertBuf);
117,140,079✔
618
  }
619
}
702,028,602✔
620

621
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
2,147,483,647✔
622

623
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
2,147,483,647✔
624

625
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
662,508,373✔
626

627
/// return the most previous req ref id
628
int64_t removeFromMostPrevReq(SRequestObj *pRequest) {
662,508,434✔
629
  int64_t      mostPrevReqRefId = pRequest->self;
662,508,434✔
630
  SRequestObj *pTmp = pRequest;
662,510,100✔
631
  while (pTmp->relation.prevRefId) {
662,510,100✔
632
    pTmp = acquireRequest(pTmp->relation.prevRefId);
×
633
    if (pTmp) {
×
634
      mostPrevReqRefId = pTmp->self;
×
635
      (void)releaseRequest(mostPrevReqRefId);  // ignore error
×
636
    } else {
637
      break;
×
638
    }
639
  }
640
  (void)removeRequest(mostPrevReqRefId);  // ignore error
662,509,255✔
641
  return mostPrevReqRefId;
662,512,178✔
642
}
643

644
void destroyNextReq(int64_t nextRefId) {
662,497,605✔
645
  if (nextRefId) {
662,497,605✔
646
    SRequestObj *pObj = acquireRequest(nextRefId);
×
647
    if (pObj) {
×
648
      (void)releaseRequest(nextRefId);  // ignore error
×
649
      (void)releaseRequest(nextRefId);  // ignore error
×
650
    }
651
  }
652
}
662,497,605✔
653

654
void destroySubRequests(SRequestObj *pRequest) {
×
655
  int32_t      reqIdx = -1;
×
656
  SRequestObj *pReqList[16] = {NULL};
×
657
  uint64_t     tmpRefId = 0;
×
658

659
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
×
660
    return;
×
661
  }
662

663
  SRequestObj *pTmp = pRequest;
×
664
  while (pTmp->relation.prevRefId) {
×
665
    tmpRefId = pTmp->relation.prevRefId;
×
666
    pTmp = acquireRequest(tmpRefId);
×
667
    if (pTmp) {
×
668
      pReqList[++reqIdx] = pTmp;
×
669
      (void)releaseRequest(tmpRefId);  // ignore error
×
670
    } else {
671
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
672
      break;
×
673
    }
674
  }
675

676
  for (int32_t i = reqIdx; i >= 0; i--) {
×
677
    (void)removeRequest(pReqList[i]->self);  // ignore error
×
678
  }
679

680
  tmpRefId = pRequest->relation.nextRefId;
×
681
  while (tmpRefId) {
×
682
    pTmp = acquireRequest(tmpRefId);
×
683
    if (pTmp) {
×
684
      tmpRefId = pTmp->relation.nextRefId;
×
685
      (void)removeRequest(pTmp->self);   // ignore error
×
686
      (void)releaseRequest(pTmp->self);  // ignore error
×
687
    } else {
688
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
689
      break;
×
690
    }
691
  }
692
}
693

694
void doDestroyRequest(void *p) {
662,508,793✔
695
  if (NULL == p) {
662,508,793✔
696
    return;
×
697
  }
698

699
  SRequestObj *pRequest = (SRequestObj *)p;
662,508,793✔
700

701
  uint64_t reqId = pRequest->requestId;
662,508,793✔
702
  tscTrace("QID:0x%" PRIx64 ", begin destroy request, res:%p", reqId, pRequest);
662,511,799✔
703

704
  int64_t nextReqRefId = pRequest->relation.nextRefId;
662,511,799✔
705

706
  int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
662,508,132✔
707
  if (TSDB_CODE_SUCCESS != code) {
662,510,115✔
708
    tscDebug("failed to remove request from hash since %s", tstrerror(code));
3,790,034✔
709
  }
710
  schedulerFreeJob(&pRequest->body.queryJob, 0);
662,510,115✔
711

712
  destorySqlCallbackWrapper(pRequest->pWrapper);
662,510,344✔
713

714
  taosMemoryFreeClear(pRequest->msgBuf);
662,505,256✔
715

716
  doFreeReqResultInfo(&pRequest->body.resInfo);
662,502,625✔
717
  if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
662,496,623✔
718
    tscError("failed to destroy semaphore");
×
719
  }
720

721
  taosArrayDestroy(pRequest->tableList);
662,505,340✔
722
  taosArrayDestroy(pRequest->targetTableList);
662,501,332✔
723
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
662,501,191✔
724

725
  if (pRequest->self) {
662,504,262✔
726
    deregisterRequest(pRequest);
662,508,292✔
727
  }
728

729
  taosMemoryFreeClear(pRequest->pDb);
662,506,742✔
730
  taosArrayDestroy(pRequest->dbList);
662,499,289✔
731
  if (pRequest->body.interParam) {
662,502,254✔
732
    if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
662,505,645✔
733
      tscError("failed to destroy semaphore in pRequest");
×
734
    }
735
  }
736
  taosMemoryFree(pRequest->body.interParam);
662,502,308✔
737

738
  qDestroyQuery(pRequest->pQuery);
662,501,038✔
739
  nodesDestroyAllocator(pRequest->allocatorRefId);
662,502,437✔
740

741
  taosMemoryFreeClear(pRequest->effectiveUser);
662,508,097✔
742
  taosMemoryFreeClear(pRequest->sqlstr);
662,508,409✔
743
  taosMemoryFree(pRequest);
662,502,863✔
744
  tscTrace("QID:0x%" PRIx64 ", end destroy request, res:%p", reqId, pRequest);
662,503,436✔
745
  destroyNextReq(nextReqRefId);
662,503,436✔
746
}
747

748
void destroyRequest(SRequestObj *pRequest) {
662,511,090✔
749
  if (pRequest == NULL) return;
662,511,090✔
750

751
  taos_stop_query(pRequest);
662,511,090✔
752
  (void)removeFromMostPrevReq(pRequest);
662,511,547✔
753
}
754

755
void taosStopQueryImpl(SRequestObj *pRequest) {
662,510,659✔
756
  pRequest->killed = true;
662,510,659✔
757

758
  // It is not a query, no need to stop.
759
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
662,512,946✔
760
    tscDebug("QID:0x%" PRIx64 ", no need to be killed since not query", pRequest->requestId);
48,475,162✔
761
    return;
48,474,876✔
762
  }
763

764
  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
614,033,791✔
765
  tscDebug("QID:0x%" PRIx64 ", killed", pRequest->requestId);
614,036,475✔
766
}
767

768
void stopAllQueries(SRequestObj *pRequest) {
662,504,993✔
769
  int32_t      reqIdx = -1;
662,504,993✔
770
  SRequestObj *pReqList[16] = {NULL};
662,504,993✔
771
  uint64_t     tmpRefId = 0;
662,505,459✔
772

773
  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
662,505,459✔
774
    return;
×
775
  }
776

777
  SRequestObj *pTmp = pRequest;
662,508,973✔
778
  while (pTmp->relation.prevRefId) {
662,508,973✔
779
    tmpRefId = pTmp->relation.prevRefId;
×
780
    pTmp = acquireRequest(tmpRefId);
×
781
    if (pTmp) {
×
782
      pReqList[++reqIdx] = pTmp;
×
783
    } else {
784
      tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
×
785
      break;
×
786
    }
787
  }
788

789
  for (int32_t i = reqIdx; i >= 0; i--) {
662,508,813✔
790
    taosStopQueryImpl(pReqList[i]);
×
791
    (void)releaseRequest(pReqList[i]->self);  // ignore error
×
792
  }
793

794
  taosStopQueryImpl(pRequest);
662,508,813✔
795

796
  tmpRefId = pRequest->relation.nextRefId;
662,511,298✔
797
  while (tmpRefId) {
662,510,214✔
798
    pTmp = acquireRequest(tmpRefId);
×
799
    if (pTmp) {
×
800
      tmpRefId = pTmp->relation.nextRefId;
×
801
      taosStopQueryImpl(pTmp);
×
802
      (void)releaseRequest(pTmp->self);  // ignore error
×
803
    } else {
804
      tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
×
805
      break;
×
806
    }
807
  }
808
}
809
#ifdef USE_REPORT
810
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
×
811

812
static void *tscCrashReportThreadFp(void *param) {
×
813
  int32_t code = 0;
×
814
  setThreadName("client-crashReport");
×
815
  char filepath[PATH_MAX] = {0};
×
816
  (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
×
817
  char     *pMsg = NULL;
×
818
  int64_t   msgLen = 0;
×
819
  TdFilePtr pFile = NULL;
×
820
  bool      truncateFile = false;
×
821
  int32_t   sleepTime = 200;
×
822
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
×
823
  int32_t   loopTimes = reportPeriodNum;
×
824

825
#ifdef WINDOWS
826
  if (taosCheckCurrentInDll()) {
827
    atexit(crashReportThreadFuncUnexpectedStopped);
828
  }
829
#endif
830

831
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
×
832
    return NULL;
×
833
  }
834
  STelemAddrMgmt mgt;
×
835
  code = taosTelemetryMgtInit(&mgt, tsTelemServer);
×
836
  if (code) {
×
837
    tscError("failed to init telemetry management, code:%s", tstrerror(code));
×
838
    return NULL;
×
839
  }
840

841
  code = initCrashLogWriter();
×
842
  if (code) {
×
843
    tscError("failed to init crash log writer, code:%s", tstrerror(code));
×
844
    return NULL;
×
845
  }
846

847
  while (1) {
848
    checkAndPrepareCrashInfo();
×
849
    if (clientStop > 0 && reportThreadSetQuit()) break;
×
850
    if (loopTimes++ < reportPeriodNum) {
×
851
      if (loopTimes < 0) loopTimes = reportPeriodNum;
×
852
      taosMsleep(sleepTime);
×
853
      continue;
×
854
    }
855

856
    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
×
857
    if (pMsg && msgLen > 0) {
×
858
      if (taosSendTelemReport(&mgt, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
×
859
        tscError("failed to send crash report");
×
860
        if (pFile) {
×
861
          taosReleaseCrashLogFile(pFile, false);
×
862
          pFile = NULL;
×
863

864
          taosMsleep(sleepTime);
×
865
          loopTimes = 0;
×
866
          continue;
×
867
        }
868
      } else {
869
        tscInfo("succeed to send crash report");
×
870
        truncateFile = true;
×
871
      }
872
    } else {
873
      tscInfo("no crash info was found");
×
874
    }
875

876
    taosMemoryFree(pMsg);
×
877

878
    if (pMsg && msgLen > 0) {
×
879
      pMsg = NULL;
×
880
      continue;
×
881
    }
882

883
    if (pFile) {
×
884
      taosReleaseCrashLogFile(pFile, truncateFile);
×
885
      pFile = NULL;
×
886
      truncateFile = false;
×
887
    }
888

889
    taosMsleep(sleepTime);
×
890
    loopTimes = 0;
×
891
  }
892
  taosTelemetryDestroy(&mgt);
×
893

894
  clientStop = -2;
×
895
  return NULL;
×
896
}
897

898
int32_t tscCrashReportInit() {
1,791,804✔
899
  if (!tsEnableCrashReport) {
1,791,804✔
900
    return TSDB_CODE_SUCCESS;
1,791,804✔
901
  }
902
  int32_t      code = TSDB_CODE_SUCCESS;
×
903
  TdThreadAttr thAttr;
×
904
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
×
905
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
×
906
  TdThread crashReportThread;
×
907
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
×
908
    tscError("failed to create crashReport thread since %s", strerror(ERRNO));
×
909
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
910
    TSC_ERR_RET(terrno);
×
911
  }
912

913
  (void)taosThreadAttrDestroy(&thAttr);
×
914
_return:
×
915
  if (code) {
×
916
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
917
    TSC_ERR_RET(terrno);
×
918
  }
919

920
  return code;
×
921
}
922

923
void tscStopCrashReport() {
1,792,088✔
924
  if (!tsEnableCrashReport) {
1,792,088✔
925
    return;
1,792,088✔
926
  }
927

928
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
×
929
    tscDebug("crash report thread already stopped");
×
930
    return;
×
931
  }
932

933
  while (atomic_load_32(&clientStop) > 0) {
×
934
    taosMsleep(100);
×
935
  }
936
}
937

938
void taos_write_crashinfo(int signum, void *sigInfo, void *context) {
×
939
  writeCrashLogToFile(signum, sigInfo, CUS_PROMPT, lastClusterId, appInfo.startTime);
×
940
}
×
941
#endif
942

943
#ifdef TAOSD_INTEGRATED
944
typedef struct {
945
  TdThread pid;
946
  int32_t  stat;  // < 0: start failed, 0: init(not start), 1: start successfully
947
} SDaemonObj;
948

949
extern int  dmStartDaemon(int argc, char const *argv[]);
950
extern void dmStopDaemon();
951

952
SDaemonObj daemonObj = {0};
953

954
typedef struct {
955
  int32_t argc;
956
  char  **argv;
957
} SExecArgs;
958

959
static void *dmStartDaemonFunc(void *param) {
960
  int32_t    code = 0;
961
  SExecArgs *pArgs = (SExecArgs *)param;
962
  int32_t    argc = pArgs->argc;
963
  char     **argv = pArgs->argv;
964

965
  code = dmStartDaemon(argc, (const char **)argv);
966
  if (code != 0) {
967
    printf("failed to start taosd since %s\r\n", tstrerror(code));
968
    goto _exit;
969
  }
970

971
_exit:
972
  if (code != 0) {
973
    atomic_store_32(&daemonObj.stat, code);
974
  }
975
  return NULL;
976
}
977

978
static int32_t shellStartDaemon(int argc, char *argv[]) {
979
  int32_t    code = 0, lino = 0;
980
  SExecArgs *pArgs = NULL;
981
  int64_t    startMs = taosGetTimestampMs(), endMs = startMs;
982

983
  TdThreadAttr thAttr;
984
  (void)taosThreadAttrInit(&thAttr);
985
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
986
#ifdef TD_COMPACT_OS
987
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
988
#endif
989
  pArgs = (SExecArgs *)taosMemoryCalloc(1, sizeof(SExecArgs));
990
  if (pArgs == NULL) {
991
    code = terrno;
992
    TAOS_CHECK_EXIT(code);
993
  }
994
  pArgs->argc = argc;
995
  pArgs->argv = argv;
996

997
#ifndef TD_AS_LIB
998
  tsLogEmbedded = 1;
999
#endif
1000

1001
  TAOS_CHECK_EXIT(taosThreadCreate(&daemonObj.pid, &thAttr, dmStartDaemonFunc, pArgs));
1002

1003
  while (true) {
1004
    if (atomic_load_64(&tsDndStart)) {
1005
      atomic_store_32(&daemonObj.stat, 1);
1006
      break;
1007
    }
1008
    int32_t daemonstat = atomic_load_32(&daemonObj.stat);
1009
    if (daemonstat < 0) {
1010
      code = daemonstat;
1011
      TAOS_CHECK_EXIT(code);
1012
    }
1013

1014
    if (daemonstat > 1) {
1015
      code = TSDB_CODE_APP_ERROR;
1016
      TAOS_CHECK_EXIT(code);
1017
    }
1018
    taosMsleep(1000);
1019
  }
1020

1021
_exit:
1022
  endMs = taosGetTimestampMs();
1023
  (void)taosThreadAttrDestroy(&thAttr);
1024
  taosMemoryFreeClear(pArgs);
1025
  if (code) {
1026
    printf("\r\n The daemon start failed at line %d since %s, cost %" PRIi64 " ms\r\n", lino, tstrerror(code),
1027
           endMs - startMs);
1028
  } else {
1029
    printf("\r\n The daemon started successfully, cost %" PRIi64 " ms\r\n", endMs - startMs);
1030
  }
1031
#ifndef TD_AS_LIB
1032
  tsLogEmbedded = 0;
1033
#endif
1034
  return code;
1035
}
1036

1037
void shellStopDaemon() {
1038
#ifndef TD_AS_LIB
1039
  tsLogEmbedded = 1;
1040
#endif
1041
  dmStopDaemon();
1042
  if (taosCheckPthreadValid(daemonObj.pid)) {
1043
    (void)taosThreadJoin(daemonObj.pid, NULL);
1044
    taosThreadClear(&daemonObj.pid);
1045
  }
1046
}
1047
#endif
1048

1049
void taos_init_imp(void) {
1,792,088✔
1050
#if defined(LINUX)
1051
  if (tscDbg.memEnable) {
1,792,088✔
1052
    int32_t code = taosMemoryDbgInit();
×
1053
    if (code) {
×
1054
      (void)printf("failed to init memory dbg, error:%s\n", tstrerror(code));
×
1055
    } else {
1056
      tsAsyncLog = false;
×
1057
      (void)printf("memory dbg enabled\n");
×
1058
    }
1059
  }
1060
#endif
1061

1062
  // In the APIs of other program language, taos_cleanup is not available yet.
1063
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
1064
  (void)atexit(taos_cleanup);
1,792,088✔
1065
  SET_ERRNO(TSDB_CODE_SUCCESS);
1,792,088✔
1066
  terrno = TSDB_CODE_SUCCESS;
1,792,088✔
1067
  taosSeedRand(taosGetTimestampSec());
1,792,088✔
1068

1069
  appInfo.pid = taosGetPId();
1,792,088✔
1070
  appInfo.startTime = taosGetTimestampMs();
1,792,088✔
1071
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,792,088✔
1072
  appInfo.pInstMapByClusterId =
1,792,088✔
1073
      taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1,792,088✔
1074
  if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
1,792,088✔
1075
    (void)printf("failed to allocate memory when init appInfo\n");
×
1076
    tscInitRes = terrno;
×
1077
    return;
×
1078
  }
1079
  taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
1,792,088✔
1080

1081
  const char *logName = CUS_PROMPT "log";
1,792,088✔
1082
  ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
1,792,088✔
1083
  if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
1,792,088✔
1084
    (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(ERRNO), configDir);
284✔
1085
    tscInitRes = terrno;
284✔
1086
    return;
284✔
1087
  }
1088

1089
#ifdef TAOSD_INTEGRATED
1090
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0), "failed to init cfg");
1091
#else
1092
  ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
1,791,804✔
1093
#endif
1094

1095
  initQueryModuleMsgHandle();
1,791,804✔
1096
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
1097
  if ((tsCharsetCxt = taosConvInit(tsCharset)) == NULL) {
1,791,804✔
1098
    tscInitRes = terrno;
×
1099
    tscError("failed to init conv");
×
1100
    return;
×
1101
  }
1102
#endif
1103
#if !defined(WINDOWS) && !defined(TD_ASTRA)
1104
  ENV_ERR_RET(tzInit(), "failed to init timezone");
1,791,804✔
1105
#endif
1106
  ENV_ERR_RET(monitorInit(), "failed to init monitor");
1,791,804✔
1107
  ENV_ERR_RET(rpcInit(), "failed to init rpc");
1,791,804✔
1108

1109
  if (InitRegexCache() != 0) {
1,791,804✔
1110
    tscInitRes = terrno;
×
1111
    (void)printf("failed to init regex cache\n");
×
1112
    return;
×
1113
  }
1114

1115
  tscInfo("starting to initialize TAOS driver");
1,791,804✔
1116

1117
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
1,791,804✔
1118
  ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
1,791,804✔
1119
  ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
1,791,804✔
1120
  ENV_ERR_RET(initClientId(), "failed to init clientId");
1,791,804✔
1121

1122
  ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
1,791,804✔
1123
  ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
1,791,804✔
1124
  ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
1,791,804✔
1125

1126
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
1,791,804✔
1127
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
1,791,804✔
1128

1129
  ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
1,791,804✔
1130
  ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
1,791,804✔
1131
#ifdef USE_REPORT
1132
  ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
1,791,804✔
1133
#endif
1134
  ENV_ERR_RET(qInitKeywordsTable(), "failed to init parser keywords table");
1,791,804✔
1135
#ifdef TAOSD_INTEGRATED
1136
  ENV_ERR_RET(shellStartDaemon(0, NULL), "failed to start taosd daemon");
1137
#endif
1138
  tscInfo("TAOS driver is initialized successfully");
1,791,804✔
1139
}
1140

1141
int taos_init() {
2,573,659✔
1142
  (void)taosThreadOnce(&tscinit, taos_init_imp);
2,573,659✔
1143
  return tscInitRes;
2,573,414✔
1144
}
1145

1146
const char *getCfgName(TSDB_OPTION option) {
6,880✔
1147
  const char *name = NULL;
6,880✔
1148

1149
  switch (option) {
6,880✔
1150
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
5,075✔
1151
      name = "shellActivityTimer";
5,075✔
1152
      break;
5,075✔
1153
    case TSDB_OPTION_LOCALE:
×
1154
      name = "locale";
×
1155
      break;
×
1156
    case TSDB_OPTION_CHARSET:
×
1157
      name = "charset";
×
1158
      break;
×
1159
    case TSDB_OPTION_TIMEZONE:
1,805✔
1160
      name = "timezone";
1,805✔
1161
      break;
1,805✔
1162
    case TSDB_OPTION_USE_ADAPTER:
×
1163
      name = "useAdapter";
×
1164
      break;
×
1165
    default:
×
1166
      break;
×
1167
  }
1168

1169
  return name;
6,880✔
1170
}
1171

1172
int taos_options_imp(TSDB_OPTION option, const char *str) {
619,690✔
1173
  if (option == TSDB_OPTION_CONFIGDIR) {
619,690✔
1174
#ifndef WINDOWS
1175
    char newstr[PATH_MAX];
556,323✔
1176
    int  len = strlen(str);
612,810✔
1177
    if (len > 1 && str[0] != '"' && str[0] != '\'') {
612,810✔
1178
      if (len + 2 >= PATH_MAX) {
612,489✔
1179
        tscError("Too long path %s", str);
×
1180
        return -1;
×
1181
      }
1182
      newstr[0] = '"';
612,489✔
1183
      (void)memcpy(newstr + 1, str, len);
612,489✔
1184
      newstr[len + 1] = '"';
612,489✔
1185
      newstr[len + 2] = '\0';
612,489✔
1186
      str = newstr;
612,489✔
1187
    }
1188
#endif
1189
    tstrncpy(configDir, str, PATH_MAX);
612,810✔
1190
    tscInfo("set cfg:%s to %s", configDir, str);
612,810✔
1191
    return 0;
612,810✔
1192
  }
1193

1194
  // initialize global config
1195
  if (taos_init() != 0) {
6,880✔
1196
    return -1;
×
1197
  }
1198

1199
  SConfig     *pCfg = taosGetCfg();
6,880✔
1200
  SConfigItem *pItem = NULL;
6,880✔
1201
  const char  *name = getCfgName(option);
6,880✔
1202

1203
  if (name == NULL) {
6,880✔
1204
    tscError("Invalid option %d", option);
×
1205
    return -1;
×
1206
  }
1207

1208
  pItem = cfgGetItem(pCfg, name);
6,880✔
1209
  if (pItem == NULL) {
6,880✔
1210
    tscError("Invalid option %d", option);
×
1211
    return -1;
×
1212
  }
1213

1214
  int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
6,880✔
1215
  if (code != 0) {
6,880✔
1216
    tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
5,075✔
1217
  } else {
1218
    tscInfo("set cfg:%s to %s", name, str);
1,805✔
1219
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
1,805✔
1220
      code = taosCfgDynamicOptions(pCfg, name, false);
×
1221
    }
1222
  }
1223

1224
  return code;
6,880✔
1225
}
1226

1227
/**
1228
 * The request id is an unsigned integer format of 64bit.
1229
 *+------------+-----+-----------+---------------+
1230
 *| uid|localIp| PId | timestamp | serial number |
1231
 *+------------+-----+-----------+---------------+
1232
 *| 12bit      |12bit|24bit      |16bit          |
1233
 *+------------+-----+-----------+---------------+
1234
 * @return
1235
 */
1236
uint64_t generateRequestId() {
705,003,255✔
1237
  static uint32_t hashId = 0;
1238
  static int32_t  requestSerialId = 0;
1239

1240
  if (hashId == 0) {
705,003,255✔
1241
    int32_t code = taosGetSystemUUIDU32(&hashId);
1,789,972✔
1242
    if (code != TSDB_CODE_SUCCESS) {
1,789,972✔
1243
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
×
1244
               tstrerror(code));
1245
    }
1246
  }
1247

1248
  uint64_t id = 0;
705,005,176✔
1249

1250
  while (true) {
×
1251
    int64_t  ts = taosGetTimestampMs();
705,007,572✔
1252
    uint64_t pid = taosGetPId();
705,007,572✔
1253
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
705,002,534✔
1254
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
705,010,151✔
1255

1256
    id = (((uint64_t)(hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
705,010,415✔
1257
    if (id) {
705,010,415✔
1258
      break;
705,010,415✔
1259
    }
1260
  }
1261
  return id;
705,010,415✔
1262
}
1263

1264
#if 0
1265
#include "cJSON.h"
1266
static setConfRet taos_set_config_imp(const char *config){
1267
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
1268
  static bool setConfFlag = false;
1269
  if (setConfFlag) {
1270
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
1271
    tstrncpy(ret.retMsg, "configuration can only set once", RET_MSG_LENGTH);
1272
    return ret;
1273
  }
1274
  taosInitGlobalCfg();
1275
  cJSON *root = cJSON_Parse(config);
1276
  if (root == NULL){
1277
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
1278
    tstrncpy(ret.retMsg, "parse json error", RET_MSG_LENGTH);
1279
    return ret;
1280
  }
1281

1282
  int size = cJSON_GetArraySize(root);
1283
  if(!cJSON_IsObject(root) || size == 0) {
1284
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
1285
    tstrncpy(ret.retMsg, "json content is invalid, must be not empty object", RET_MSG_LENGTH);
1286
    return ret;
1287
  }
1288

1289
  if(size >= 1000) {
1290
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
1291
    tstrncpy(ret.retMsg, "json object size is too long", RET_MSG_LENGTH);
1292
    return ret;
1293
  }
1294

1295
  for(int i = 0; i < size; i++){
1296
    cJSON *item = cJSON_GetArrayItem(root, i);
1297
    if(!item) {
1298
      ret.retCode = SET_CONF_RET_ERR_INNER;
1299
      tstrncpy(ret.retMsg, "inner error", RET_MSG_LENGTH);
1300
      return ret;
1301
    }
1302
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
1303
      ret.retCode = SET_CONF_RET_ERR_PART;
1304
      if (strlen(ret.retMsg) == 0){
1305
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
1306
      }else{
1307
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1308
        size_t leftSize = tmp >= 0 ? tmp : 0;
1309
        strncat(ret.retMsg, "|",  leftSize);
1310
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
1311
        leftSize = tmp >= 0 ? tmp : 0;
1312
        strncat(ret.retMsg, item->string, leftSize);
1313
      }
1314
    }
1315
  }
1316
  cJSON_Delete(root);
1317
  setConfFlag = true;
1318
  return ret;
1319
}
1320

1321
setConfRet taos_set_config(const char *config){
1322
  taosThreadMutexLock(&setConfMutex);
1323
  setConfRet ret = taos_set_config_imp(config);
1324
  taosThreadMutexUnlock(&setConfMutex);
1325
  return ret;
1326
}
1327
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc