• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4886

16 Dec 2025 01:13AM UTC coverage: 65.292% (+0.03%) from 65.258%
#4886

push

travis-ci

web-flow
fix: compile error (#33938)

178718 of 273721 relevant lines covered (65.29%)

103311111.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.64
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "decimal.h"
22
#include "scheduler.h"
23
#include "tdatablock.h"
24
#include "tdataformat.h"
25
#include "tdef.h"
26
#include "tglobal.h"
27
#include "tmisce.h"
28
#include "tmsg.h"
29
#include "tmsgtype.h"
30
#include "tpagedbuf.h"
31
#include "tref.h"
32
#include "tsched.h"
33
#include "tversion.h"
34

35
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
36
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
37

38
int32_t connUpdateSessMgtMetric(int64_t connId, SSessParam* pParam);
39
//int32_t tscUpdateSessMgtMetric(STscObj* pTscObj, SSessParam* pParam);
40

41
void setQueryRequest(int64_t rId) {
112,438,254✔
42
  SRequestObj* pReq = acquireRequest(rId);
112,438,254✔
43
  if (pReq != NULL) {
112,439,132✔
44
    pReq->isQuery = true;
112,428,812✔
45
    (void)releaseRequest(rId);
112,428,812✔
46
  }
47
}
112,438,477✔
48

49
static bool stringLengthCheck(const char* str, size_t maxsize) {
6,549,402✔
50
  if (str == NULL) {
6,549,402✔
51
    return false;
×
52
  }
53

54
  size_t len = strlen(str);
6,549,402✔
55
  if (len <= 0 || len > maxsize) {
6,549,402✔
56
    return false;
×
57
  }
58

59
  return true;
6,549,782✔
60
}
61

62
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
2,689,473✔
63

64
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
2,689,240✔
65

66
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
1,170,620✔
67

68
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
2,689,245✔
69
  char key[512] = {0};
2,689,245✔
70
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
2,689,245✔
71
  return taosStrdup(key);
2,689,245✔
72
}
73

74
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
647,257✔
75
  if (dst == NULL || src == NULL || srcLength == 0) {
647,257✔
76
    return 0;
539✔
77
  }
78

79
  size_t escapeLength = 0;
646,718✔
80
  for (size_t i = 0; i < srcLength; ++i) {
18,349,558✔
81
    if (src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' || src[i] == '\r' ||
17,702,840✔
82
        src[i] == '\t') {
17,702,840✔
83
      escapeLength += 1;
×
84
    }
85
  }
86

87
  size_t dstLength = srcLength;
646,718✔
88
  if (escapeLength == 0) {
646,718✔
89
    (void)memcpy(dst, src, srcLength);
646,718✔
90
  } else {
91
    dstLength = 0;
×
92
    for (size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
93
      switch (src[i]) {
×
94
        case '\"':
×
95
          dst[dstLength++] = '\\';
×
96
          dst[dstLength++] = '\"';
×
97
          break;
×
98
        case '\\':
×
99
          dst[dstLength++] = '\\';
×
100
          dst[dstLength++] = '\\';
×
101
          break;
×
102
        case '\b':
×
103
          dst[dstLength++] = '\\';
×
104
          dst[dstLength++] = 'b';
×
105
          break;
×
106
        case '\f':
×
107
          dst[dstLength++] = '\\';
×
108
          dst[dstLength++] = 'f';
×
109
          break;
×
110
        case '\n':
×
111
          dst[dstLength++] = '\\';
×
112
          dst[dstLength++] = 'n';
×
113
          break;
×
114
        case '\r':
×
115
          dst[dstLength++] = '\\';
×
116
          dst[dstLength++] = 'r';
×
117
          break;
×
118
        case '\t':
×
119
          dst[dstLength++] = '\\';
×
120
          dst[dstLength++] = 't';
×
121
          break;
×
122
        default:
×
123
          dst[dstLength++] = src[i];
×
124
      }
125
    }
126
  }
127

128
  return dstLength;
646,718✔
129
}
130

131
bool chkRequestKilled(void* param) {
2,147,483,647✔
132
  bool         killed = false;
2,147,483,647✔
133
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
134
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
135
    killed = true;
×
136
  }
137

138
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
139

140
  return killed;
2,147,483,647✔
141
}
142

143
void cleanupAppInfo() {
1,188,864✔
144
  taosHashCleanup(appInfo.pInstMap);
1,188,864✔
145
  taosHashCleanup(appInfo.pInstMapByClusterId);
1,188,864✔
146
  tscInfo("cluster instance map cleaned");
1,188,864✔
147
}
1,188,864✔
148

149
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db,
150
                               __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType,
151
                               STscObj** pTscObj);
152

153
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* totp,
2,689,542✔
154
                              const char* db, uint16_t port, int connType, STscObj** pObj) {
155
  TSC_ERR_RET(taos_init());
2,689,542✔
156
  if (!validateUserName(user)) {
2,689,542✔
157
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
158
  }
159
  int32_t code = 0;
2,689,853✔
160

161
  char localDb[TSDB_DB_NAME_LEN] = {0};
2,689,853✔
162
  if (db != NULL && strlen(db) > 0) {
2,689,853✔
163
    if (!validateDbName(db)) {
1,170,620✔
164
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
165
    }
166

167
    tstrncpy(localDb, db, sizeof(localDb));
1,170,620✔
168
    (void)strdequote(localDb);
1,170,620✔
169
  }
170

171
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
2,689,784✔
172
  if (auth == NULL) {
2,689,784✔
173
    if (!validatePassword(pass)) {
2,689,240✔
174
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
175
    }
176

177
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
2,689,309✔
178
  } else {
179
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
544✔
180
  }
181

182
  int32_t totpCode = -1;
2,689,752✔
183
  if (totp != NULL) {
2,689,752✔
184
    char* endptr = NULL;
×
185
    totpCode = taosStr2Int32(totp, &endptr, 10);
×
186
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
×
187
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
188
    }
189
  }
190

191
  SCorEpSet epSet = {0};
2,689,752✔
192
  if (ip) {
2,689,752✔
193
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
913,307✔
194
  } else {
195
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
1,776,445✔
196
  }
197

198
  if (port) {
2,689,277✔
199
    epSet.epSet.eps[0].port = port;
108,361✔
200
    epSet.epSet.eps[1].port = port;
108,361✔
201
  }
202

203
  char* key = getClusterKey(user, secretEncrypt, ip, port);
2,689,277✔
204
  if (NULL == key) {
2,689,060✔
205
    TSC_ERR_RET(terrno);
×
206
  }
207
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
2,689,060✔
208
          user, db, key);
209
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
7,155,132✔
210
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
4,465,823✔
211
  }
212
  // for (int32_t i = 0; i < epSet.epSet.numOfEps; i++) {
213
  //   if ((code = taosValidFqdn(tsEnableIpv6, epSet.epSet.eps[i].fqdn)) != 0) {
214
  //     taosMemFree(key);
215
  //     tscError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6,
216
  //              epSet.epSet.eps[i].fqdn, tstrerror(code));
217
  //     TSC_ERR_RET(code);
218
  //   }
219
  // }
220

221
  SAppInstInfo** pInst = NULL;
2,689,309✔
222
  code = taosThreadMutexLock(&appInfo.mutex);
2,689,309✔
223
  if (TSDB_CODE_SUCCESS != code) {
2,689,309✔
224
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
225
    TSC_ERR_RET(code);
×
226
  }
227

228
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
2,689,309✔
229
  SAppInstInfo* p = NULL;
2,689,309✔
230
  if (pInst == NULL) {
2,689,309✔
231
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
1,244,854✔
232
    if (NULL == p) {
1,244,854✔
233
      TSC_ERR_JRET(terrno);
×
234
    }
235
    p->mgmtEp = epSet;
1,244,854✔
236
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
1,244,854✔
237
    if (TSDB_CODE_SUCCESS != code) {
1,244,854✔
238
      taosMemoryFree(p);
×
239
      TSC_ERR_JRET(code);
×
240
    }
241
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
1,244,854✔
242
    if (TSDB_CODE_SUCCESS != code) {
1,244,854✔
243
      taosMemoryFree(p);
36✔
244
      TSC_ERR_JRET(code);
36✔
245
    }
246
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
1,244,818✔
247
    if (TSDB_CODE_SUCCESS != code) {
1,244,818✔
248
      destroyAppInst(&p);
×
249
      TSC_ERR_JRET(code);
×
250
    }
251
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
1,244,818✔
252
    if (TSDB_CODE_SUCCESS != code) {
1,244,818✔
253
      destroyAppInst(&p);
×
254
      TSC_ERR_JRET(code);
×
255
    }
256
    p->instKey = key;
1,244,818✔
257
    key = NULL;
1,244,818✔
258
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
1,244,818✔
259

260
    pInst = &p;
1,244,818✔
261
  } else {
262
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
1,444,455✔
263
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
264
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
265
    }
266
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
267
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
1,444,455✔
268
  }
269

270
_return:
2,689,309✔
271

272
  if (TSDB_CODE_SUCCESS != code) {
2,689,309✔
273
    (void)taosThreadMutexUnlock(&appInfo.mutex);
36✔
274
    taosMemoryFreeClear(key);
36✔
275
    return code;
36✔
276
  } else {
277
    code = taosThreadMutexUnlock(&appInfo.mutex);
2,689,273✔
278
    taosMemoryFreeClear(key);
2,689,273✔
279
    if (TSDB_CODE_SUCCESS != code) {
2,689,273✔
280
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
281
      return code;
×
282
    }
283
    SSessParam pPara = {.type = SESSION_PER_USER, .value = 1};
2,689,273✔
284
    code = sessMgtUpdateUserMetric((char*)user, &pPara);
2,689,273✔
285
    if (TSDB_CODE_SUCCESS != code) {
2,689,273✔
286
      tscError("failed to connect with user:%s, code:%s", user, tstrerror(code));
×
287
      return code;
×
288
    }
289
    return taosConnectImpl(user, &secretEncrypt[0], totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
2,689,273✔
290
  }
291
}
292

293
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
294
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
295
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
296
//     return *ppAppInstInfo;
297
//   } else {
298
//     return NULL;
299
//   }
300
// }
301

302
void freeQueryParam(SSyncQueryParam* param) {
552,102✔
303
  if (param == NULL) return;
552,102✔
304
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
552,102✔
305
    tscError("failed to destroy semaphore in freeQueryParam");
×
306
  }
307
  taosMemoryFree(param);
552,102✔
308
}
309

310
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
632,786,292✔
311
                     SRequestObj** pRequest, int64_t reqid) {
312
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
632,786,292✔
313
  if (TSDB_CODE_SUCCESS != code) {
632,785,934✔
314
    tscError("failed to malloc sqlObj, %s", sql);
×
315
    return code;
×
316
  }
317

318
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
632,785,934✔
319
  if ((*pRequest)->sqlstr == NULL) {
632,781,222✔
320
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
321
    destroyRequest(*pRequest);
×
322
    *pRequest = NULL;
×
323
    return terrno;
×
324
  }
325

326
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
632,784,530✔
327
  (*pRequest)->sqlstr[sqlLen] = 0;
632,796,098✔
328
  (*pRequest)->sqlLen = sqlLen;
632,793,108✔
329
  (*pRequest)->validateOnly = validateSql;
632,795,363✔
330
  (*pRequest)->stmtBindVersion = 0;
632,793,972✔
331

332
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
632,793,634✔
333

334
  STscObj* pTscObj = (*pRequest)->pTscObj;
632,793,688✔
335
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
632,792,135✔
336
                             sizeof((*pRequest)->self));
337
  if (err) {
632,788,516✔
338
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
339
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
340
    destroyRequest(*pRequest);
×
341
    *pRequest = NULL;
×
342
    return terrno;
×
343
  }
344

345
  (*pRequest)->allocatorRefId = -1;
632,788,516✔
346
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
632,790,049✔
347
    if (TSDB_CODE_SUCCESS !=
186,810,867✔
348
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
186,802,354✔
349
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
350
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
351
      destroyRequest(*pRequest);
×
352
      *pRequest = NULL;
×
353
      return terrno;
×
354
    }
355
  }
356

357
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
632,796,276✔
358
  return TSDB_CODE_SUCCESS;
632,790,001✔
359
}
360

361
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
362
  int32_t code =
363
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
364
  if (TSDB_CODE_SUCCESS == code) {
×
365
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
366
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
367
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
368
    (*pNewRequest)->isSubReq = true;
×
369
  }
370
  return code;
×
371
}
372

373
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
639,387✔
374
  STscObj* pTscObj = pRequest->pTscObj;
639,387✔
375

376
  SParseContext cxt = {
639,907✔
377
      .requestId = pRequest->requestId,
640,096✔
378
      .requestRid = pRequest->self,
638,987✔
379
      .acctId = pTscObj->acctId,
638,897✔
380
      .db = pRequest->pDb,
639,417✔
381
      .topicQuery = topicQuery,
382
      .pSql = pRequest->sqlstr,
639,198✔
383
      .sqlLen = pRequest->sqlLen,
639,507✔
384
      .pMsg = pRequest->msgBuf,
639,907✔
385
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
386
      .pTransporter = pTscObj->pAppInfo->pTransporter,
639,107✔
387
      .pStmtCb = pStmtCb,
388
      .pUser = pTscObj->user,
639,168✔
389
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
639,198✔
390
      .enableSysInfo = pTscObj->sysInfo,
638,768✔
391
      .svrVer = pTscObj->sVer,
638,957✔
392
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
639,786✔
393
      .stmtBindVersion = pRequest->stmtBindVersion,
639,477✔
394
      .setQueryFp = setQueryRequest,
395
      .timezone = pTscObj->optionInfo.timezone,
638,987✔
396
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
639,267✔
397
  };
398

399
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
639,168✔
400
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
640,186✔
401
  if (code != TSDB_CODE_SUCCESS) {
639,666✔
402
    return code;
×
403
  }
404

405
  code = qParseSql(&cxt, pQuery);
639,666✔
406
  if (TSDB_CODE_SUCCESS == code) {
638,550✔
407
    if ((*pQuery)->haveResultSet) {
636,976✔
408
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
409
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
410
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
411
    }
412
  }
413

414
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
638,678✔
415
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
636,796✔
416
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
637,528✔
417
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
637,537✔
418
  }
419

420
  taosArrayDestroy(cxt.pTableMetaPos);
638,180✔
421
  taosArrayDestroy(cxt.pTableVgroupPos);
638,396✔
422

423
  return code;
638,315✔
424
}
425

426
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
427
  SRetrieveTableRsp* pRsp = NULL;
×
428
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
429
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
430
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
431
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
432
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
433
                                 pRequest->stmtBindVersion > 0);
×
434
  }
435

436
  return code;
×
437
}
438

439
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
340,376✔
440
  // drop table if exists not_exists_table
441
  if (NULL == pQuery->pCmdMsg) {
340,376✔
442
    return TSDB_CODE_SUCCESS;
×
443
  }
444

445
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
340,376✔
446
  pRequest->type = pMsgInfo->msgType;
340,376✔
447
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
340,376✔
448
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
340,376✔
449

450
  STscObj*      pTscObj = pRequest->pTscObj;
340,376✔
451
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
340,376✔
452

453
  // int64_t transporterId = 0;
454
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
340,376✔
455
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
340,376✔
456
  return TSDB_CODE_SUCCESS;
340,376✔
457
}
458

459
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
1,185,206,715✔
460

461
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
5,089,052✔
462
  SRetrieveTableRsp* pRsp = NULL;
5,089,052✔
463
  if (pRequest->validateOnly) {
5,089,052✔
464
    doRequestCallback(pRequest, 0);
12,231✔
465
    return;
12,231✔
466
  }
467

468
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
10,139,666✔
469
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
10,139,666✔
470
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
5,076,821✔
471
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
2,751,656✔
472
                                 pRequest->stmtBindVersion > 0);
2,751,656✔
473
  }
474

475
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
5,076,821✔
476
  pRequest->code = code;
5,076,821✔
477

478
  if (pRequest->code != TSDB_CODE_SUCCESS) {
5,076,821✔
479
    pResultInfo->numOfRows = 0;
3,777✔
480
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
3,777✔
481
             pRequest->requestId);
482
  } else {
483
    tscDebug(
5,073,044✔
484
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
485
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
486
  }
487

488
  doRequestCallback(pRequest, code);
5,076,821✔
489
}
490

491
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
16,394,737✔
492
  if (pRequest->validateOnly) {
16,394,737✔
493
    doRequestCallback(pRequest, 0);
×
494
    return TSDB_CODE_SUCCESS;
×
495
  }
496

497
  // drop table if exists not_exists_table
498
  if (NULL == pQuery->pCmdMsg) {
16,394,737✔
499
    doRequestCallback(pRequest, 0);
7,791✔
500
    return TSDB_CODE_SUCCESS;
7,791✔
501
  }
502

503
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
16,387,257✔
504
  pRequest->type = pMsgInfo->msgType;
16,386,466✔
505
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
16,387,257✔
506
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
16,386,443✔
507

508
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
16,386,466✔
509
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
16,386,946✔
510

511
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
16,387,001✔
512
  if (code) {
16,387,257✔
513
    doRequestCallback(pRequest, code);
×
514
  }
515
  return code;
16,387,257✔
516
}
517

518
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
365,249✔
519
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
365,249✔
520
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
365,249✔
521

522
  if (node1->load < node2->load) {
365,249✔
523
    return -1;
×
524
  }
525

526
  return node1->load > node2->load;
365,249✔
527
}
528

529
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
55,159✔
530
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
55,159✔
531
  if (pInfo->pQnodeList) {
55,159✔
532
    taosArrayDestroy(pInfo->pQnodeList);
52,605✔
533
    pInfo->pQnodeList = NULL;
52,605✔
534
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
52,605✔
535
  }
536

537
  if (pNodeList) {
55,159✔
538
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
55,159✔
539
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
55,159✔
540
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
55,159✔
541
             taosArrayGetSize(pInfo->pQnodeList));
542
  }
543
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
55,159✔
544

545
  return TSDB_CODE_SUCCESS;
55,159✔
546
}
547

548
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
633,014,807✔
549
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
633,014,807✔
550
    *required = false;
632,864,829✔
551
    return TSDB_CODE_SUCCESS;
632,860,866✔
552
  }
553

554
  int32_t       code = TSDB_CODE_SUCCESS;
149,978✔
555
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
149,978✔
556
  *required = false;
149,978✔
557

558
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
149,978✔
559
  *required = (NULL == pInfo->pQnodeList);
149,978✔
560
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
149,978✔
561
  return TSDB_CODE_SUCCESS;
149,978✔
562
}
563

564
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
565
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
566
  int32_t       code = 0;
×
567

568
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
569
  if (pInfo->pQnodeList) {
×
570
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
571
  }
572
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
573
  if (NULL == *pNodeList) {
×
574
    SCatalog* pCatalog = NULL;
×
575
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
576
    if (TSDB_CODE_SUCCESS == code) {
×
577
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
578
      if (NULL == pNodeList) {
×
579
        TSC_ERR_RET(terrno);
×
580
      }
581
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
582
                               .requestId = pRequest->requestId,
×
583
                               .requestObjRefId = pRequest->self,
×
584
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
585
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
586
    }
587

588
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
589
      code = updateQnodeList(pInfo, *pNodeList);
×
590
    }
591
  }
592

593
  return code;
×
594
}
595

596
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
6,208,545✔
597
  pRequest->type = pQuery->msgType;
6,208,545✔
598
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
6,209,268✔
599

600
  SPlanContext cxt = {.queryId = pRequest->requestId,
7,029,346✔
601
                      .acctId = pRequest->pTscObj->acctId,
6,208,113✔
602
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
6,209,266✔
603
                      .pAstRoot = pQuery->pRoot,
6,209,386✔
604
                      .showRewrite = pQuery->showRewrite,
6,209,266✔
605
                      .pMsg = pRequest->msgBuf,
6,209,236✔
606
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
607
                      .pUser = pRequest->pTscObj->user,
6,209,206✔
608
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
6,208,966✔
609
                      .sysInfo = pRequest->pTscObj->sysInfo};
6,208,203✔
610

611
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
6,207,855✔
612
}
613

614
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
111,982,479✔
615
                         const SExtSchema* pExtSchema, bool isStmt) {
616
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
111,982,479✔
617
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
618
    return TSDB_CODE_INVALID_PARA;
×
619
  }
620

621
  pResInfo->numOfCols = numOfCols;
111,983,137✔
622
  if (pResInfo->fields != NULL) {
111,983,349✔
623
    taosMemoryFree(pResInfo->fields);
18,437✔
624
  }
625
  if (pResInfo->userFields != NULL) {
111,981,274✔
626
    taosMemoryFree(pResInfo->userFields);
18,437✔
627
  }
628
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
111,982,458✔
629
  if (NULL == pResInfo->fields) return terrno;
111,981,373✔
630
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
111,981,662✔
631
  if (NULL == pResInfo->userFields) {
111,980,810✔
632
    taosMemoryFree(pResInfo->fields);
×
633
    return terrno;
×
634
  }
635
  if (numOfCols != pResInfo->numOfCols) {
111,981,347✔
636
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
637
    return TSDB_CODE_FAILED;
×
638
  }
639

640
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
682,578,008✔
641
    pResInfo->fields[i].type = pSchema[i].type;
570,594,973✔
642

643
    pResInfo->userFields[i].type = pSchema[i].type;
570,595,369✔
644
    // userFields must convert to type bytes, no matter isStmt or not
645
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
570,596,959✔
646
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
570,597,642✔
647
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
570,592,323✔
648
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
1,407,207✔
649
    }
650

651
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
570,592,706✔
652
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
570,597,471✔
653
  }
654
  return TSDB_CODE_SUCCESS;
111,982,962✔
655
}
656

657
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
81,910,425✔
658
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
81,910,425✔
659
      precision != TSDB_TIME_PRECISION_NANO) {
660
    return;
×
661
  }
662

663
  pResInfo->precision = precision;
81,910,425✔
664
}
665

666
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
85,882,756✔
667
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
85,882,756✔
668
  if (NULL == nodeList) {
85,885,049✔
669
    return terrno;
×
670
  }
671
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
85,885,319✔
672

673
  int32_t dbNum = taosArrayGetSize(pDbVgList);
85,885,319✔
674
  for (int32_t i = 0; i < dbNum; ++i) {
169,661,402✔
675
    SArray* pVg = taosArrayGetP(pDbVgList, i);
83,772,061✔
676
    if (NULL == pVg) {
83,772,677✔
677
      continue;
×
678
    }
679
    int32_t vgNum = taosArrayGetSize(pVg);
83,772,677✔
680
    if (vgNum <= 0) {
83,772,497✔
681
      continue;
676,122✔
682
    }
683

684
    for (int32_t j = 0; j < vgNum; ++j) {
269,567,305✔
685
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
186,467,640✔
686
      if (NULL == pInfo) {
186,470,274✔
687
        taosArrayDestroy(nodeList);
×
688
        return TSDB_CODE_OUT_OF_RANGE;
×
689
      }
690
      SQueryNodeLoad load = {0};
186,470,274✔
691
      load.addr.nodeId = pInfo->vgId;
186,470,830✔
692
      load.addr.epSet = pInfo->epSet;
186,469,995✔
693

694
      if (NULL == taosArrayPush(nodeList, &load)) {
186,466,504✔
695
        taosArrayDestroy(nodeList);
×
696
        return terrno;
×
697
      }
698
    }
699
  }
700

701
  int32_t vnodeNum = taosArrayGetSize(nodeList);
85,889,341✔
702
  if (vnodeNum > 0) {
85,887,174✔
703
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
82,820,539✔
704
    goto _return;
82,818,612✔
705
  }
706

707
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
3,066,635✔
708
  if (mnodeNum <= 0) {
3,066,418✔
709
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
710
    goto _return;
×
711
  }
712

713
  void* pData = taosArrayGet(pMnodeList, 0);
3,066,418✔
714
  if (NULL == pData) {
3,066,418✔
715
    taosArrayDestroy(nodeList);
×
716
    return TSDB_CODE_OUT_OF_RANGE;
×
717
  }
718
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
3,066,418✔
719
    taosArrayDestroy(nodeList);
×
720
    return terrno;
×
721
  }
722

723
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
3,066,418✔
724

725
_return:
45,576✔
726

727
  *pNodeList = nodeList;
85,884,881✔
728

729
  return TSDB_CODE_SUCCESS;
85,884,636✔
730
}
731

732
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
82,606✔
733
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
82,606✔
734
  if (NULL == nodeList) {
82,606✔
735
    return terrno;
×
736
  }
737

738
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
82,606✔
739
  if (qNodeNum > 0) {
82,606✔
740
    void* pData = taosArrayGet(pQnodeList, 0);
322✔
741
    if (NULL == pData) {
322✔
742
      taosArrayDestroy(nodeList);
×
743
      return TSDB_CODE_OUT_OF_RANGE;
×
744
    }
745
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
322✔
746
      taosArrayDestroy(nodeList);
×
747
      return terrno;
×
748
    }
749
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
322✔
750
    goto _return;
322✔
751
  }
752

753
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
82,284✔
754
  if (mnodeNum <= 0) {
82,284✔
755
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
56✔
756
    goto _return;
56✔
757
  }
758

759
  void* pData = taosArrayGet(pMnodeList, 0);
82,228✔
760
  if (NULL == pData) {
82,228✔
761
    taosArrayDestroy(nodeList);
×
762
    return TSDB_CODE_OUT_OF_RANGE;
×
763
  }
764
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
82,228✔
765
    taosArrayDestroy(nodeList);
×
766
    return terrno;
×
767
  }
768

769
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
82,228✔
770

771
_return:
×
772

773
  *pNodeList = nodeList;
82,606✔
774

775
  return TSDB_CODE_SUCCESS;
82,606✔
776
}
777

778
void freeVgList(void* list) {
6,164,916✔
779
  SArray* pList = *(SArray**)list;
6,164,916✔
780
  taosArrayDestroy(pList);
6,165,861✔
781
}
6,166,914✔
782

783
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
79,759,058✔
784
  SArray* pDbVgList = NULL;
79,759,058✔
785
  SArray* pQnodeList = NULL;
79,759,058✔
786
  FDelete fp = NULL;
79,759,058✔
787
  int32_t code = 0;
79,759,058✔
788

789
  switch (tsQueryPolicy) {
79,759,058✔
790
    case QUERY_POLICY_VNODE:
79,676,052✔
791
    case QUERY_POLICY_CLIENT: {
792
      if (pResultMeta) {
79,676,052✔
793
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
79,676,052✔
794
        if (NULL == pDbVgList) {
79,676,747✔
795
          code = terrno;
×
796
          goto _return;
×
797
        }
798
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
79,676,747✔
799
        for (int32_t i = 0; i < dbNum; ++i) {
157,284,658✔
800
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
77,606,230✔
801
          if (pRes->code || NULL == pRes->pRes) {
77,606,446✔
802
            continue;
1,065✔
803
          }
804

805
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
155,211,617✔
806
            code = terrno;
×
807
            goto _return;
×
808
          }
809
        }
810
      } else {
811
        fp = freeVgList;
×
812

813
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
814
        if (dbNum > 0) {
×
815
          SCatalog*     pCtg = NULL;
×
816
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
817
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
818
          if (code != TSDB_CODE_SUCCESS) {
×
819
            goto _return;
×
820
          }
821

822
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
823
          if (NULL == pDbVgList) {
×
824
            code = terrno;
×
825
            goto _return;
×
826
          }
827
          SArray* pVgList = NULL;
×
828
          for (int32_t i = 0; i < dbNum; ++i) {
×
829
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
830
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
831
                                     .requestId = pRequest->requestId,
×
832
                                     .requestObjRefId = pRequest->self,
×
833
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
834

835
            // catalogGetDBVgList will handle dbFName == null.
836
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
837
            if (code) {
×
838
              goto _return;
×
839
            }
840

841
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
842
              code = terrno;
×
843
              goto _return;
×
844
            }
845
          }
846
        }
847
      }
848

849
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
79,678,428✔
850
      break;
79,677,434✔
851
    }
852
    case QUERY_POLICY_HYBRID:
82,606✔
853
    case QUERY_POLICY_QNODE: {
854
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
165,002✔
855
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
82,396✔
856
        if (pRes->code) {
82,396✔
857
          pQnodeList = NULL;
×
858
        } else {
859
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
82,396✔
860
          if (NULL == pQnodeList) {
82,396✔
861
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
862
            goto _return;
×
863
          }
864
        }
865
      } else {
866
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
210✔
867
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
210✔
868
        if (pInst->pQnodeList) {
210✔
869
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
210✔
870
          if (NULL == pQnodeList) {
210✔
871
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
872
            goto _return;
×
873
          }
874
        }
875
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
210✔
876
      }
877

878
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
82,606✔
879
      break;
82,606✔
880
    }
881
    default:
419✔
882
      tscError("unknown query policy: %d", tsQueryPolicy);
419✔
883
      return TSDB_CODE_APP_ERROR;
×
884
  }
885

886
_return:
79,760,040✔
887
  taosArrayDestroyEx(pDbVgList, fp);
79,760,040✔
888
  taosArrayDestroy(pQnodeList);
79,759,588✔
889

890
  return code;
79,760,306✔
891
}
892

893
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
6,205,949✔
894
  SArray* pDbVgList = NULL;
6,205,949✔
895
  SArray* pQnodeList = NULL;
6,205,949✔
896
  int32_t code = 0;
6,207,125✔
897

898
  switch (tsQueryPolicy) {
6,207,125✔
899
    case QUERY_POLICY_VNODE:
6,205,809✔
900
    case QUERY_POLICY_CLIENT: {
901
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
6,205,809✔
902
      if (dbNum > 0) {
6,208,268✔
903
        SCatalog*     pCtg = NULL;
6,166,912✔
904
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
6,166,882✔
905
        code = catalogGetHandle(pInst->clusterId, &pCtg);
6,167,235✔
906
        if (code != TSDB_CODE_SUCCESS) {
6,165,339✔
907
          goto _return;
×
908
        }
909

910
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
6,165,339✔
911
        if (NULL == pDbVgList) {
6,167,296✔
912
          code = terrno;
175✔
913
          goto _return;
×
914
        }
915
        SArray* pVgList = NULL;
6,167,121✔
916
        for (int32_t i = 0; i < dbNum; ++i) {
12,332,810✔
917
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
6,165,877✔
918
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
6,166,781✔
919
                                   .requestId = pRequest->requestId,
6,166,715✔
920
                                   .requestObjRefId = pRequest->self,
6,167,180✔
921
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
6,167,098✔
922

923
          // catalogGetDBVgList will handle dbFName == null.
924
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
6,167,609✔
925
          if (code) {
6,167,130✔
926
            goto _return;
×
927
          }
928

929
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
6,166,681✔
930
            code = terrno;
×
931
            goto _return;
×
932
          }
933
        }
934
      }
935

936
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
6,208,822✔
937
      break;
6,207,682✔
938
    }
939
    case QUERY_POLICY_HYBRID:
×
940
    case QUERY_POLICY_QNODE: {
941
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
942

943
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
944
      break;
×
945
    }
946
    default:
1,376✔
947
      tscError("unknown query policy: %d", tsQueryPolicy);
1,376✔
948
      return TSDB_CODE_APP_ERROR;
×
949
  }
950

951
_return:
6,207,222✔
952

953
  taosArrayDestroyEx(pDbVgList, freeVgList);
6,206,917✔
954
  taosArrayDestroy(pQnodeList);
6,207,518✔
955

956
  return code;
6,208,928✔
957
}
958

959
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
6,209,478✔
960
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
6,209,478✔
961

962
  SExecResult      res = {0};
6,209,478✔
963
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
6,209,478✔
964
                           .requestId = pRequest->requestId,
6,209,478✔
965
                           .requestObjRefId = pRequest->self};
6,209,478✔
966
  SSchedulerReq    req = {
7,030,245✔
967
         .syncReq = true,
968
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
6,209,478✔
969
         .pConn = &conn,
970
         .pNodeList = pNodeList,
971
         .pDag = pDag,
972
         .sql = pRequest->sqlstr,
6,209,478✔
973
         .startTs = pRequest->metric.start,
6,209,478✔
974
         .execFp = NULL,
975
         .cbParam = NULL,
976
         .chkKillFp = chkRequestKilled,
977
         .chkKillParam = (void*)pRequest->self,
6,209,478✔
978
         .pExecRes = &res,
979
         .source = pRequest->source,
6,209,478✔
980
         .pWorkerCb = getTaskPoolWorkerCb(),
6,209,478✔
981
  };
982

983
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
6,209,478✔
984

985
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
6,209,131✔
986
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
6,208,813✔
987

988
  if (code != TSDB_CODE_SUCCESS) {
6,208,824✔
989
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
990

991
    pRequest->code = code;
×
992
    terrno = code;
×
993
    return pRequest->code;
×
994
  }
995

996
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
6,208,824✔
997
      TDMT_VND_CREATE_TABLE == pRequest->type) {
15,180✔
998
    pRequest->body.resInfo.numOfRows = res.numOfRows;
6,197,593✔
999
    if (TDMT_VND_SUBMIT == pRequest->type) {
6,198,125✔
1000
      STscObj*            pTscObj = pRequest->pTscObj;
6,193,258✔
1001
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
6,193,296✔
1002
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
6,193,546✔
1003
    }
1004

1005
    schedulerFreeJob(&pRequest->body.queryJob, 0);
6,198,009✔
1006
  }
1007

1008
  pRequest->code = res.code;
6,209,508✔
1009
  terrno = res.code;
6,208,748✔
1010
  return pRequest->code;
6,208,326✔
1011
}
1012

1013
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
450,120,399✔
1014
  SArray*      pArray = NULL;
450,120,399✔
1015
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
450,120,399✔
1016
  if (NULL == pRsp->aCreateTbRsp) {
450,120,399✔
1017
    return TSDB_CODE_SUCCESS;
441,579,600✔
1018
  }
1019

1020
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
8,546,939✔
1021
  for (int32_t i = 0; i < tbNum; ++i) {
20,668,670✔
1022
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
12,121,459✔
1023
    if (pTbRsp->pMeta) {
12,121,429✔
1024
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
11,521,984✔
1025
    }
1026
  }
1027

1028
  return TSDB_CODE_SUCCESS;
8,547,211✔
1029
}
1030

1031
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
67,459,369✔
1032
  int32_t code = 0;
67,459,369✔
1033
  SArray* pArray = NULL;
67,459,369✔
1034
  SArray* pTbArray = (SArray*)res;
67,459,369✔
1035
  int32_t tbNum = taosArrayGetSize(pTbArray);
67,459,369✔
1036
  if (tbNum <= 0) {
67,459,369✔
1037
    return TSDB_CODE_SUCCESS;
×
1038
  }
1039

1040
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
67,459,369✔
1041
  if (NULL == pArray) {
67,458,727✔
1042
    return terrno;
×
1043
  }
1044

1045
  for (int32_t i = 0; i < tbNum; ++i) {
178,091,104✔
1046
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
110,632,377✔
1047
    if (NULL == tbInfo) {
110,632,350✔
1048
      code = terrno;
×
1049
      goto _return;
×
1050
    }
1051
    STbSVersion tbSver = {
110,632,350✔
1052
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
110,632,350✔
1053
    if (NULL == taosArrayPush(pArray, &tbSver)) {
110,632,796✔
1054
      code = terrno;
×
1055
      goto _return;
×
1056
    }
1057
  }
1058

1059
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
67,458,727✔
1060
                           .requestId = pRequest->requestId,
67,459,369✔
1061
                           .requestObjRefId = pRequest->self,
67,458,950✔
1062
                           .mgmtEps = *epset};
1063

1064
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
67,458,923✔
1065

1066
_return:
67,458,474✔
1067

1068
  taosArrayDestroy(pArray);
67,457,964✔
1069
  return code;
67,458,355✔
1070
}
1071

1072
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
8,843,330✔
1073
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
8,843,330✔
1074
}
1075

1076
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
68,582,790✔
1077
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
68,582,790✔
1078
}
1079

1080
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
596,496,680✔
1081
  if (NULL == pRequest->body.resInfo.execRes.res) {
596,496,680✔
1082
    return pRequest->code;
23,835,968✔
1083
  }
1084

1085
  SCatalog*     pCatalog = NULL;
572,657,449✔
1086
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
572,660,648✔
1087

1088
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
572,669,635✔
1089
  if (code) {
572,658,347✔
1090
    return code;
×
1091
  }
1092

1093
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
572,658,347✔
1094
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
572,672,508✔
1095

1096
  switch (pRes->msgType) {
572,674,836✔
1097
    case TDMT_VND_ALTER_TABLE:
3,874,444✔
1098
    case TDMT_MND_ALTER_STB: {
1099
      code = handleAlterTbExecRes(pRes->res, pCatalog);
3,874,444✔
1100
      break;
3,874,444✔
1101
    }
1102
    case TDMT_VND_CREATE_TABLE: {
50,871,747✔
1103
      SArray* pList = (SArray*)pRes->res;
50,871,747✔
1104
      int32_t num = taosArrayGetSize(pList);
50,875,746✔
1105
      for (int32_t i = 0; i < num; ++i) {
106,063,673✔
1106
        void* res = taosArrayGetP(pList, i);
55,186,054✔
1107
        // handleCreateTbExecRes will handle res == null
1108
        code = handleCreateTbExecRes(res, pCatalog);
55,189,301✔
1109
      }
1110
      break;
50,877,619✔
1111
    }
1112
    case TDMT_MND_CREATE_STB: {
333,887✔
1113
      code = handleCreateTbExecRes(pRes->res, pCatalog);
333,887✔
1114
      break;
333,887✔
1115
    }
1116
    case TDMT_VND_SUBMIT: {
450,121,786✔
1117
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
450,121,786✔
1118

1119
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
450,127,600✔
1120
      break;
450,121,624✔
1121
    }
1122
    case TDMT_SCH_QUERY:
67,459,369✔
1123
    case TDMT_SCH_MERGE_QUERY: {
1124
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
67,459,369✔
1125
      break;
67,459,196✔
1126
    }
1127
    default:
180✔
1128
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
180✔
1129
               pRequest->type, pRequest->requestId);
1130
      code = TSDB_CODE_APP_ERROR;
×
1131
  }
1132

1133
  return code;
572,666,770✔
1134
}
1135

1136
static bool incompletaFileParsing(SNode* pStmt) {
589,034,306✔
1137
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
589,034,306✔
1138
}
1139

1140
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
1141
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1142

1143
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
1144
  if (TSDB_CODE_SUCCESS == code) {
×
1145
    int64_t analyseStart = taosGetTimestampUs();
×
1146
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1147
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1148
  }
1149

1150
  if (TSDB_CODE_SUCCESS == code) {
×
1151
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1152
  }
1153

1154
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1155
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1156
}
×
1157

1158
void returnToUser(SRequestObj* pRequest) {
12,339,932✔
1159
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
12,339,932✔
1160
    // return to client
1161
    doRequestCallback(pRequest, pRequest->code);
12,339,932✔
1162
    return;
12,339,932✔
1163
  }
1164

1165
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1166
  if (pUserReq) {
×
1167
    pUserReq->code = pRequest->code;
×
1168
    // return to client
1169
    doRequestCallback(pUserReq, pUserReq->code);
×
1170
    (void)releaseRequest(pRequest->relation.userRefId);
×
1171
    return;
×
1172
  } else {
1173
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1174
             pRequest->relation.userRefId, pRequest->requestId);
1175
  }
1176
}
1177

1178
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1179
  int64_t     lastTs = 0;
×
1180
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
1181
  int32_t     numOfFields = taos_num_fields(pRes);
×
1182

1183
  int32_t code = createDataBlock(pBlock);
×
1184
  if (code) {
×
1185
    return code;
×
1186
  }
1187

1188
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1189
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1190
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1191
    if (TSDB_CODE_SUCCESS != code) {
×
1192
      blockDataDestroy(*pBlock);
×
1193
      return code;
×
1194
    }
1195
  }
1196

1197
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1198
  if (TSDB_CODE_SUCCESS != code) {
×
1199
    blockDataDestroy(*pBlock);
×
1200
    return code;
×
1201
  }
1202

1203
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1204
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1205
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1206
      tscError("invalid data from vnode");
×
1207
      blockDataDestroy(*pBlock);
×
1208
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1209
    }
1210
    int64_t ts = *(int64_t*)pRow[0];
×
1211
    if (lastTs < ts) {
×
1212
      lastTs = ts;
×
1213
    }
1214

1215
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1216
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1217
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1218
      if (TSDB_CODE_SUCCESS != code) {
×
1219
        blockDataDestroy(*pBlock);
×
1220
        return code;
×
1221
      }
1222
    }
1223

1224
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1225
            *(int64_t*)pRow[2]);
1226
  }
1227

1228
  (*pBlock)->info.window.ekey = lastTs;
×
1229
  (*pBlock)->info.rows = numOfRows;
×
1230

1231
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
1232
  return TSDB_CODE_SUCCESS;
×
1233
}
1234

1235
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1236
  SRequestObj* pRequest = (SRequestObj*)res;
×
1237
  if (pRequest->code) {
×
1238
    returnToUser(pRequest);
×
1239
    return;
×
1240
  }
1241

1242
  SSDataBlock* pBlock = NULL;
×
1243
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1244
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1245
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1246
             tstrerror(pRequest->code));
1247
    returnToUser(pRequest);
×
1248
    return;
×
1249
  }
1250

1251
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1252
  if (pNextReq) {
×
1253
    continuePostSubQuery(pNextReq, pBlock);
×
1254
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1255
  } else {
1256
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1257
             pRequest->relation.nextRefId, pRequest->requestId);
1258
  }
1259

1260
  blockDataDestroy(pBlock);
×
1261
}
1262

1263
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
1264
  SRequestObj* pRequest = pWrapper->pRequest;
×
1265
  if (TD_RES_QUERY(pRequest)) {
×
1266
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1267
    return;
×
1268
  }
1269

1270
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1271
  if (pNextReq) {
×
1272
    continuePostSubQuery(pNextReq, NULL);
×
1273
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1274
  } else {
1275
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1276
             pRequest->relation.nextRefId, pRequest->requestId);
1277
  }
1278
}
1279

1280
// todo refacto the error code  mgmt
1281
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
589,996,406✔
1282
  SSqlCallbackWrapper* pWrapper = param;
589,996,406✔
1283
  SRequestObj*         pRequest = pWrapper->pRequest;
589,996,406✔
1284
  STscObj*             pTscObj = pRequest->pTscObj;
590,000,826✔
1285

1286
  pRequest->code = code;
590,005,555✔
1287
  if (pResult) {
590,007,524✔
1288
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
589,964,224✔
1289
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
589,973,901✔
1290
  }
1291

1292
  int32_t type = pRequest->type;
589,992,324✔
1293
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
589,982,335✔
1294
    if (pResult) {
498,247,348✔
1295
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
498,218,969✔
1296

1297
      // record the insert rows
1298
      if (TDMT_VND_SUBMIT == type) {
498,237,408✔
1299
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
444,070,714✔
1300
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
444,072,666✔
1301
      }
1302
    }
1303
    schedulerFreeJob(&pRequest->body.queryJob, 0);
498,265,417✔
1304
  }
1305

1306
  taosMemoryFree(pResult);
590,005,171✔
1307
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
589,991,585✔
1308
           pRequest->requestId);
1309

1310
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
589,992,439✔
1311
      pRequest->stmtBindVersion == 0) {
44,066✔
1312
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
44,066✔
1313
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1314
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
44,066✔
1315
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1316
    }
1317
    restartAsyncQuery(pRequest, code);
44,066✔
1318
    return;
44,066✔
1319
  }
1320

1321
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
589,948,373✔
1322
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
589,948,373✔
1323
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
2,813,740✔
1324
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1325
    }
1326
  }
1327

1328
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
589,951,037✔
1329
  int32_t code1 = handleQueryExecRsp(pRequest);
589,955,909✔
1330
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
589,956,560✔
1331
    pRequest->code = code1;
×
1332
  }
1333

1334
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
1,178,995,380✔
1335
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
589,035,021✔
1336
    continueInsertFromCsv(pWrapper, pRequest);
11,670✔
1337
    return;
11,670✔
1338
  }
1339

1340
  if (pRequest->relation.nextRefId) {
589,951,481✔
1341
    handlePostSubQuery(pWrapper);
×
1342
  } else {
1343
    destorySqlCallbackWrapper(pWrapper);
589,950,725✔
1344
    pRequest->pWrapper = NULL;
589,940,240✔
1345

1346
    // return to client
1347
    doRequestCallback(pRequest, code);
589,942,748✔
1348
  }
1349
}
1350

1351
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
6,547,029✔
1352
  int32_t code = 0;
6,547,029✔
1353
  int32_t subplanNum = 0;
6,547,029✔
1354

1355
  if (pQuery->pRoot) {
6,547,029✔
1356
    pRequest->stmtType = pQuery->pRoot->type;
6,208,891✔
1357
  }
1358

1359
  if (pQuery->pRoot && !pRequest->inRetry) {
6,548,506✔
1360
    STscObj*            pTscObj = pRequest->pTscObj;
6,208,695✔
1361
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
6,208,175✔
1362
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
6,208,638✔
1363
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
6,198,516✔
1364
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
10,320✔
1365
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
10,348✔
1366
    }
1367
  }
1368

1369
  pRequest->body.execMode = pQuery->execMode;
6,547,976✔
1370
  switch (pQuery->execMode) {
6,548,951✔
1371
    case QUERY_EXEC_MODE_LOCAL:
×
1372
      if (!pRequest->validateOnly) {
×
1373
        if (NULL == pQuery->pRoot) {
×
1374
          terrno = TSDB_CODE_INVALID_PARA;
×
1375
          code = terrno;
×
1376
        } else {
1377
          code = execLocalCmd(pRequest, pQuery);
×
1378
        }
1379
      }
1380
      break;
×
1381
    case QUERY_EXEC_MODE_RPC:
340,376✔
1382
      if (!pRequest->validateOnly) {
340,376✔
1383
        code = execDdlQuery(pRequest, pQuery);
340,376✔
1384
      }
1385
      break;
340,376✔
1386
    case QUERY_EXEC_MODE_SCHEDULE: {
6,207,721✔
1387
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
6,207,721✔
1388
      if (NULL == pMnodeList) {
6,208,230✔
1389
        code = terrno;
×
1390
        break;
×
1391
      }
1392
      SQueryPlan* pDag = NULL;
6,208,230✔
1393
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
6,208,230✔
1394
      if (TSDB_CODE_SUCCESS == code) {
6,207,122✔
1395
        pRequest->body.subplanNum = pDag->numOfSubplans;
6,207,649✔
1396
        if (!pRequest->validateOnly) {
6,206,704✔
1397
          SArray* pNodeList = NULL;
6,207,653✔
1398
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
6,207,938✔
1399

1400
          if (TSDB_CODE_SUCCESS == code) {
6,208,035✔
1401
            SSessParam para = {.type = SESSION_MAX_CALL_VNODE_NUM, .value = taosArrayGetSize(pNodeList)};
6,209,028✔
1402
            code = tscUpdateSessMgtMetric(pRequest->pTscObj, &para);
6,207,183✔
1403
          }
1404

1405
          if (TSDB_CODE_SUCCESS == code) {
6,209,423✔
1406
            code = scheduleQuery(pRequest, pDag, pNodeList);
6,209,508✔
1407
          }
1408
          taosArrayDestroy(pNodeList);
6,208,005✔
1409
        }
1410
      }
1411
      taosArrayDestroy(pMnodeList);
6,208,142✔
1412
      break;
6,209,086✔
1413
    }
1414
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1415
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1416
      break;
×
1417
    default:
×
1418
      break;
×
1419
  }
1420

1421
  if (!keepQuery) {
6,549,462✔
1422
    qDestroyQuery(pQuery);
×
1423
  }
1424

1425
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
6,549,462✔
1426
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
6,612✔
1427
    if (TSDB_CODE_SUCCESS != ret) {
6,612✔
1428
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1429
               pRequest->requestId);
1430
    }
1431
  }
1432

1433
  if (TSDB_CODE_SUCCESS == code) {
6,548,869✔
1434
    code = handleQueryExecRsp(pRequest);
6,548,035✔
1435
  }
1436

1437
  if (TSDB_CODE_SUCCESS != code) {
6,547,893✔
1438
    pRequest->code = code;
4,858✔
1439
  }
1440

1441
  if (res) {
6,547,893✔
1442
    *res = pRequest->body.resInfo.execRes.res;
×
1443
    pRequest->body.resInfo.execRes.res = NULL;
×
1444
  }
1445
}
6,547,893✔
1446

1447
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
590,451,581✔
1448
                                 SSqlCallbackWrapper* pWrapper) {
1449
  int32_t code = TSDB_CODE_SUCCESS;
590,451,581✔
1450
  pRequest->type = pQuery->msgType;
590,451,581✔
1451
  SArray*     pMnodeList = NULL;
590,457,599✔
1452
  SQueryPlan* pDag = NULL;
590,457,599✔
1453
  int64_t     st = taosGetTimestampUs();
590,451,850✔
1454

1455
  if (!pRequest->parseOnly) {
590,451,850✔
1456
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
590,428,962✔
1457
    if (NULL == pMnodeList) {
590,442,969✔
1458
      code = terrno;
×
1459
    }
1460
    SPlanContext cxt = {.queryId = pRequest->requestId,
595,999,451✔
1461
                        .acctId = pRequest->pTscObj->acctId,
590,471,333✔
1462
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
590,486,083✔
1463
                        .pAstRoot = pQuery->pRoot,
590,496,893✔
1464
                        .showRewrite = pQuery->showRewrite,
590,497,549✔
1465
                        .isView = pWrapper->pParseCtx->isView,
590,488,451✔
1466
                        .isAudit = pWrapper->pParseCtx->isAudit,
590,484,752✔
1467
                        .pMsg = pRequest->msgBuf,
590,487,560✔
1468
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1469
                        .pUser = pRequest->pTscObj->user,
590,471,956✔
1470
                        .sysInfo = pRequest->pTscObj->sysInfo,
590,469,571✔
1471
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
590,465,701✔
1472
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
590,487,401✔
1473
    if (TSDB_CODE_SUCCESS == code) {
590,469,467✔
1474
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
590,475,266✔
1475
    }
1476
    if (code) {
590,448,796✔
1477
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
268,437✔
1478
               pRequest->requestId);
1479
    } else {
1480
      pRequest->body.subplanNum = pDag->numOfSubplans;
590,180,359✔
1481
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
590,192,901✔
1482
    }
1483
  }
1484

1485
  pRequest->metric.execStart = taosGetTimestampUs();
590,456,667✔
1486
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
590,444,732✔
1487

1488
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
593,241,847✔
1489
    SArray* pNodeList = NULL;
589,963,637✔
1490
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
589,956,806✔
1491
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
79,759,322✔
1492
    }
1493

1494
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
589,977,716✔
1495
                             .requestId = pRequest->requestId,
589,975,018✔
1496
                             .requestObjRefId = pRequest->self};
589,980,757✔
1497
    SSchedulerReq    req = {
592,750,269✔
1498
           .syncReq = false,
1499
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
589,961,646✔
1500
           .pConn = &conn,
1501
           .pNodeList = pNodeList,
1502
           .pDag = pDag,
1503
           .allocatorRefId = pRequest->allocatorRefId,
589,961,646✔
1504
           .sql = pRequest->sqlstr,
589,956,783✔
1505
           .startTs = pRequest->metric.start,
589,976,394✔
1506
           .execFp = schedulerExecCb,
1507
           .cbParam = pWrapper,
1508
           .chkKillFp = chkRequestKilled,
1509
           .chkKillParam = (void*)pRequest->self,
589,971,813✔
1510
           .pExecRes = NULL,
1511
           .source = pRequest->source,
589,957,696✔
1512
           .pWorkerCb = getTaskPoolWorkerCb(),
589,945,191✔
1513
    };
1514
    if (TSDB_CODE_SUCCESS == code) {
589,957,295✔
1515
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
589,993,312✔
1516
    }
1517

1518
    taosArrayDestroy(pNodeList);
589,956,201✔
1519
  } else {
1520
    qDestroyQueryPlan(pDag);
514,801✔
1521
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
490,570✔
1522
             pRequest->requestId);
1523
    destorySqlCallbackWrapper(pWrapper);
490,570✔
1524
    pRequest->pWrapper = NULL;
490,570✔
1525
    if (TSDB_CODE_SUCCESS != code) {
490,570✔
1526
      pRequest->code = terrno;
268,437✔
1527
    }
1528

1529
    doRequestCallback(pRequest, code);
490,570✔
1530
  }
1531

1532
  // todo not to be released here
1533
  taosArrayDestroy(pMnodeList);
590,490,884✔
1534

1535
  return code;
590,474,804✔
1536
}
1537

1538
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
612,585,646✔
1539
  int32_t code = 0;
612,585,646✔
1540

1541
  if (pRequest->parseOnly) {
612,585,646✔
1542
    doRequestCallback(pRequest, 0);
286,118✔
1543
    return;
286,118✔
1544
  }
1545

1546
  pRequest->body.execMode = pQuery->execMode;
612,316,121✔
1547
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
612,300,142✔
1548
    destorySqlCallbackWrapper(pWrapper);
21,840,069✔
1549
    pRequest->pWrapper = NULL;
21,840,069✔
1550
  }
1551

1552
  if (pQuery->pRoot && !pRequest->inRetry) {
612,287,468✔
1553
    STscObj*            pTscObj = pRequest->pTscObj;
612,305,341✔
1554
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
612,289,865✔
1555
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
612,302,061✔
1556
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
510,220,186✔
1557
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
444,030,413✔
1558
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
168,274,657✔
1559
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
75,236,966✔
1560
    }
1561
  }
1562

1563
  switch (pQuery->execMode) {
612,292,056✔
1564
    case QUERY_EXEC_MODE_LOCAL:
5,089,052✔
1565
      asyncExecLocalCmd(pRequest, pQuery);
5,089,052✔
1566
      break;
5,089,052✔
1567
    case QUERY_EXEC_MODE_RPC:
16,394,737✔
1568
      code = asyncExecDdlQuery(pRequest, pQuery);
16,394,737✔
1569
      break;
16,395,048✔
1570
    case QUERY_EXEC_MODE_SCHEDULE: {
590,472,877✔
1571
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
590,472,877✔
1572
      break;
590,480,635✔
1573
    }
1574
    case QUERY_EXEC_MODE_EMPTY_RESULT:
356,783✔
1575
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
356,783✔
1576
      doRequestCallback(pRequest, 0);
356,783✔
1577
      break;
356,783✔
1578
    default:
×
1579
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1580
      doRequestCallback(pRequest, -1);
×
1581
      break;
×
1582
  }
1583
}
1584

1585
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
11,234✔
1586
  SCatalog* pCatalog = NULL;
11,234✔
1587
  int32_t   code = 0;
11,234✔
1588
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
11,234✔
1589
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
11,234✔
1590

1591
  if (dbNum <= 0 && tblNum <= 0) {
11,234✔
1592
    return TSDB_CODE_APP_ERROR;
11,202✔
1593
  }
1594

1595
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
32✔
1596
  if (code != TSDB_CODE_SUCCESS) {
32✔
1597
    return code;
×
1598
  }
1599

1600
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
32✔
1601
                           .requestId = pRequest->requestId,
32✔
1602
                           .requestObjRefId = pRequest->self,
32✔
1603
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
32✔
1604

1605
  for (int32_t i = 0; i < dbNum; ++i) {
64✔
1606
    char* dbFName = taosArrayGet(pRequest->dbList, i);
32✔
1607

1608
    // catalogRefreshDBVgInfo will handle dbFName == null.
1609
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
32✔
1610
    if (code != TSDB_CODE_SUCCESS) {
32✔
1611
      return code;
×
1612
    }
1613
  }
1614

1615
  for (int32_t i = 0; i < tblNum; ++i) {
64✔
1616
    SName* tableName = taosArrayGet(pRequest->tableList, i);
32✔
1617

1618
    // catalogRefreshTableMeta will handle tableName == null.
1619
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
32✔
1620
    if (code != TSDB_CODE_SUCCESS) {
32✔
1621
      return code;
×
1622
    }
1623
  }
1624

1625
  return code;
32✔
1626
}
1627

1628
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
4,099,219✔
1629
  SCatalog* pCatalog = NULL;
4,099,219✔
1630
  int32_t   tbNum = taosArrayGetSize(tbList);
4,099,219✔
1631
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,099,219✔
1632
  if (code != TSDB_CODE_SUCCESS) {
4,099,219✔
1633
    return code;
×
1634
  }
1635

1636
  if (isView) {
4,099,219✔
1637
    for (int32_t i = 0; i < tbNum; ++i) {
814,466✔
1638
      SName* pViewName = taosArrayGet(tbList, i);
407,233✔
1639
      char   dbFName[TSDB_DB_FNAME_LEN];
404,683✔
1640
      if (NULL == pViewName) {
407,233✔
1641
        continue;
×
1642
      }
1643
      (void)tNameGetFullDbName(pViewName, dbFName);
407,233✔
1644
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
407,233✔
1645
    }
1646
  } else {
1647
    for (int32_t i = 0; i < tbNum; ++i) {
5,508,901✔
1648
      SName* pTbName = taosArrayGet(tbList, i);
1,816,915✔
1649
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
1,816,915✔
1650
    }
1651
  }
1652

1653
  return TSDB_CODE_SUCCESS;
4,099,219✔
1654
}
1655

1656
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
2,689,386✔
1657
  pEpSet->version = 0;
2,689,386✔
1658

1659
  // init mnode ip set
1660
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
2,689,697✔
1661
  mgmtEpSet->numOfEps = 0;
2,689,697✔
1662
  mgmtEpSet->inUse = 0;
2,689,489✔
1663

1664
  if (firstEp && firstEp[0] != 0) {
2,689,697✔
1665
    if (strlen(firstEp) >= TSDB_EP_LEN) {
2,689,752✔
1666
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1667
      return -1;
×
1668
    }
1669

1670
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,689,752✔
1671
    if (code != TSDB_CODE_SUCCESS) {
2,689,743✔
1672
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1673
      return terrno;
×
1674
    }
1675
    // uint32_t addr = 0;
1676
    SIpAddr addr = {0};
2,689,743✔
1677
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
2,689,743✔
1678
    if (code) {
2,689,711✔
1679
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
544✔
1680
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1681
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
544✔
1682
    } else {
1683
      mgmtEpSet->numOfEps++;
2,689,167✔
1684
    }
1685
  }
1686

1687
  if (secondEp && secondEp[0] != 0) {
2,689,007✔
1688
    if (strlen(secondEp) >= TSDB_EP_LEN) {
1,775,810✔
1689
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1690
      return terrno;
×
1691
    }
1692

1693
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
1,775,810✔
1694
    if (code != TSDB_CODE_SUCCESS) {
1,776,514✔
1695
      return code;
×
1696
    }
1697
    SIpAddr addr = {0};
1,776,514✔
1698
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
1,775,865✔
1699
    if (code) {
1,776,514✔
1700
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1701
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1702
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1703
    } else {
1704
      mgmtEpSet->numOfEps++;
1,776,514✔
1705
    }
1706
  }
1707

1708
  if (mgmtEpSet->numOfEps == 0) {
2,690,111✔
1709
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
544✔
1710
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
544✔
1711
  }
1712

1713
  return 0;
2,689,167✔
1714
}
1715

1716
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp,
2,689,273✔
1717
                        void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1718
  *pTscObj = NULL;
2,689,273✔
1719
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
2,689,273✔
1720
  if (TSDB_CODE_SUCCESS != code) {
2,689,273✔
1721
    return code;
×
1722
  }
1723

1724
  SRequestObj* pRequest = NULL;
2,689,273✔
1725
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
2,689,273✔
1726
  if (TSDB_CODE_SUCCESS != code) {
2,689,218✔
1727
    destroyTscObj(*pTscObj);
×
1728
    return code;
×
1729
  }
1730

1731
  pRequest->sqlstr = taosStrdup("taos_connect");
2,689,218✔
1732
  if (pRequest->sqlstr) {
2,689,273✔
1733
    pRequest->sqlLen = strlen(pRequest->sqlstr);
2,689,273✔
1734
  } else {
1735
    return terrno;
×
1736
  }
1737

1738
  SMsgSendInfo* body = NULL;
2,689,273✔
1739
  code = buildConnectMsg(pRequest, &body, totpCode);
2,689,273✔
1740
  if (TSDB_CODE_SUCCESS != code) {
2,688,022✔
1741
    destroyTscObj(*pTscObj);
×
1742
    return code;
×
1743
  }
1744

1745
  // int64_t transporterId = 0;
1746
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
2,688,022✔
1747
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
2,688,879✔
1748
  if (TSDB_CODE_SUCCESS != code) {
2,689,273✔
1749
    destroyTscObj(*pTscObj);
×
1750
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1751
    return code;
×
1752
  }
1753
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
2,689,273✔
1754
    destroyTscObj(*pTscObj);
×
1755
    tscError("failed to wait sem, code:%s", terrstr());
×
1756
    return terrno;
×
1757
  }
1758
  if (pRequest->code != TSDB_CODE_SUCCESS) {
2,689,273✔
1759
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
11,332✔
1760
    tscError("failed to connect to server, reason: %s", errorMsg);
11,332✔
1761

1762
    terrno = pRequest->code;
11,332✔
1763
    destroyRequest(pRequest);
11,332✔
1764
    taos_close_internal(*pTscObj);
11,332✔
1765
    *pTscObj = NULL;
11,332✔
1766
    return terrno;
11,332✔
1767
  }
1768
  if (connType == CONN_TYPE__AUTH_TEST) {
2,677,941✔
1769
    terrno = TSDB_CODE_SUCCESS;
×
1770
    destroyRequest(pRequest);
×
1771
    taos_close_internal(*pTscObj);
×
1772
    *pTscObj = NULL;
×
1773
    return TSDB_CODE_SUCCESS;
×
1774
  }
1775

1776
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
2,677,941✔
1777
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1778
  destroyRequest(pRequest);
2,677,973✔
1779
  return code;
2,677,941✔
1780
}
1781

1782
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
2,689,218✔
1783
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,689,218✔
1784
  if (*pMsgSendInfo == NULL) {
2,689,218✔
1785
    return terrno;
×
1786
  }
1787

1788
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
2,688,969✔
1789

1790
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
2,688,969✔
1791
  (*pMsgSendInfo)->requestId = pRequest->requestId;
2,689,037✔
1792
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
2,688,788✔
1793
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
2,689,037✔
1794
  if (NULL == (*pMsgSendInfo)->param) {
2,689,163✔
1795
    taosMemoryFree(*pMsgSendInfo);
×
1796
    return terrno;
×
1797
  }
1798

1799
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
2,689,163✔
1800

1801
  SConnectReq connectReq = {0};
2,688,982✔
1802
  STscObj*    pObj = pRequest->pTscObj;
2,688,982✔
1803

1804
  char* db = getDbOfConnection(pObj);
2,688,982✔
1805
  if (db != NULL) {
2,689,273✔
1806
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1,170,565✔
1807
  } else if (terrno) {
1,518,708✔
1808
    taosMemoryFree(*pMsgSendInfo);
×
1809
    return terrno;
×
1810
  }
1811
  taosMemoryFreeClear(db);
2,689,218✔
1812

1813
  connectReq.connType = pObj->connType;
2,689,163✔
1814
  connectReq.pid = appInfo.pid;
2,689,163✔
1815
  connectReq.startTime = appInfo.startTime;
2,689,163✔
1816
  connectReq.totpCode = totpCode;
2,689,163✔
1817

1818
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
2,689,163✔
1819
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
2,688,763✔
1820
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
2,689,163✔
1821
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
2,688,660✔
1822

1823
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
2,688,660✔
1824
  void*   pReq = taosMemoryMalloc(contLen);
2,687,828✔
1825
  if (NULL == pReq) {
2,688,774✔
1826
    taosMemoryFree(*pMsgSendInfo);
×
1827
    return terrno;
×
1828
  }
1829

1830
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
2,688,774✔
1831
    taosMemoryFree(*pMsgSendInfo);
×
1832
    taosMemoryFree(pReq);
×
1833
    return terrno;
×
1834
  }
1835

1836
  (*pMsgSendInfo)->msgInfo.len = contLen;
2,689,218✔
1837
  (*pMsgSendInfo)->msgInfo.pData = pReq;
2,689,218✔
1838
  return TSDB_CODE_SUCCESS;
2,688,271✔
1839
}
1840

1841
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
998,788,417✔
1842
  if (NULL == pEpSet) {
998,788,417✔
1843
    return;
993,747,349✔
1844
  }
1845

1846
  switch (pSendInfo->target.type) {
5,041,068✔
1847
    case TARGET_TYPE_MNODE:
4,160✔
1848
      if (NULL == pTscObj) {
4,160✔
1849
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1850
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1851
        return;
×
1852
      }
1853

1854
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
4,160✔
1855
      SEpSet* pOrig = &originEpset;
4,160✔
1856
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
4,160✔
1857
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
4,160✔
1858
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
4,160✔
1859
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1860
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
4,160✔
1861
      break;
1,028,153✔
1862
    case TARGET_TYPE_VNODE: {
4,794,661✔
1863
      if (NULL == pTscObj) {
4,794,661✔
1864
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1865
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1866
        return;
×
1867
      }
1868

1869
      SCatalog* pCatalog = NULL;
4,794,661✔
1870
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,794,661✔
1871
      if (code != TSDB_CODE_SUCCESS) {
4,794,212✔
1872
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1873
                 tstrerror(code));
1874
        return;
×
1875
      }
1876

1877
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
4,794,212✔
1878
      if (code != TSDB_CODE_SUCCESS) {
4,794,928✔
1879
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1880
                 tstrerror(code));
1881
        return;
×
1882
      }
1883
      taosMemoryFreeClear(pSendInfo->target.dbFName);
4,794,958✔
1884
      break;
4,794,958✔
1885
    }
1886
    default:
242,382✔
1887
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
242,382✔
1888
      break;
242,382✔
1889
  }
1890
}
1891

1892
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
999,384,346✔
1893
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
999,384,346✔
1894
  if (pMsg->info.ahandle == NULL) {
999,385,586✔
1895
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
591,878✔
1896
    rpcFreeCont(pMsg->pCont);
591,878✔
1897
    taosMemoryFree(pEpSet);
591,878✔
1898
    return TSDB_CODE_TSC_INTERNAL_ERROR;
591,878✔
1899
  }
1900

1901
  STscObj* pTscObj = NULL;
998,793,669✔
1902

1903
  STraceId* trace = &pMsg->info.traceId;
998,793,669✔
1904
  char      tbuf[40] = {0};
998,794,348✔
1905
  TRACE_TO_STR(trace, tbuf);
998,793,199✔
1906

1907
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
998,794,054✔
1908
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1909

1910
  if (pSendInfo->requestObjRefId != 0) {
998,794,234✔
1911
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
865,440,396✔
1912
    if (pRequest) {
865,440,429✔
1913
      if (pRequest->self != pSendInfo->requestObjRefId) {
865,237,334✔
1914
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1915
                 pSendInfo->requestObjRefId);
1916

1917
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1918
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1919
        }
1920
        rpcFreeCont(pMsg->pCont);
×
1921
        taosMemoryFree(pEpSet);
×
1922
        destroySendMsgInfo(pSendInfo);
×
1923
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1924
      }
1925
      pTscObj = pRequest->pTscObj;
865,239,865✔
1926
    }
1927
  }
1928

1929
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
998,796,603✔
1930

1931
  SDataBuf buf = {.msgType = pMsg->msgType,
998,787,031✔
1932
                  .len = pMsg->contLen,
998,787,933✔
1933
                  .pData = NULL,
1934
                  .handle = pMsg->info.handle,
998,790,549✔
1935
                  .handleRefId = pMsg->info.refId,
998,788,952✔
1936
                  .pEpSet = pEpSet};
1937

1938
  if (pMsg->contLen > 0) {
998,790,877✔
1939
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
976,869,553✔
1940
    if (buf.pData == NULL) {
976,866,250✔
1941
      pMsg->code = terrno;
×
1942
    } else {
1943
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
976,866,250✔
1944
    }
1945
  }
1946

1947
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
998,793,897✔
1948

1949
  if (pTscObj) {
998,770,019✔
1950
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
865,224,771✔
1951
    if (TSDB_CODE_SUCCESS != code) {
865,239,352✔
1952
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1953
      terrno = code;
×
1954
      pMsg->code = code;
×
1955
    }
1956
  }
1957

1958
  rpcFreeCont(pMsg->pCont);
998,784,600✔
1959
  destroySendMsgInfo(pSendInfo);
998,768,301✔
1960
  return TSDB_CODE_SUCCESS;
998,752,435✔
1961
}
1962

1963
int32_t doProcessMsgFromServer(void* param) {
999,386,003✔
1964
  AsyncArg* arg = (AsyncArg*)param;
999,386,003✔
1965
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
999,386,003✔
1966
  taosMemoryFree(arg);
999,356,136✔
1967
  return code;
999,334,953✔
1968
}
1969

1970
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
999,375,749✔
1971
  int32_t code = 0;
999,375,749✔
1972
  SEpSet* tEpSet = NULL;
999,375,749✔
1973

1974
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
999,375,749✔
1975

1976
  if (pEpSet != NULL) {
999,378,732✔
1977
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
5,041,557✔
1978
    if (NULL == tEpSet) {
5,041,347✔
1979
      code = terrno;
×
1980
      pMsg->code = terrno;
×
1981
      goto _exit;
×
1982
    }
1983
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
5,041,347✔
1984
  }
1985

1986
  // pMsg is response msg
1987
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
999,378,522✔
1988
    // restore origin code
1989
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
2,689,273✔
1990
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1991
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
2,688,873✔
1992
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1993
    }
1994
  } else {
1995
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1996
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
996,688,915✔
1997
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1998
    }
1999
  }
2000

2001
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
999,378,855✔
2002
  if (NULL == arg) {
999,370,450✔
2003
    code = terrno;
×
2004
    pMsg->code = code;
×
2005
    goto _exit;
×
2006
  }
2007

2008
  arg->msg = *pMsg;
999,370,450✔
2009
  arg->pEpset = tEpSet;
999,375,900✔
2010

2011
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
999,374,972✔
2012
    pMsg->code = code;
×
2013
    taosMemoryFree(arg);
×
2014
    goto _exit;
×
2015
  }
2016
  return;
999,384,180✔
2017

2018
_exit:
×
2019
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
2020
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
2021
  if (code != 0) {
×
2022
    tscError("failed to sched msg to tsc, tsc ready quit");
×
2023
  }
2024
}
2025

2026
TAOS* taos_connect_totp(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
×
2027
                        uint16_t port) {
2028
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
×
2029
  if (user == NULL) {
×
2030
    user = TSDB_DEFAULT_USER;
×
2031
  }
2032

2033
  if (pass == NULL) {
×
2034
    pass = TSDB_DEFAULT_PASS;
×
2035
  }
2036

2037
  STscObj* pObj = NULL;
×
2038
  int32_t  code = taos_connect_internal(ip, user, pass, NULL, totp, db, port, CONN_TYPE__QUERY, &pObj);
×
2039
  if (TSDB_CODE_SUCCESS == code) {
×
2040
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
×
2041
    if (NULL == rid) {
×
2042
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2043
      return NULL;
×
2044
    }
2045
    *rid = pObj->id;
×
2046
    return (TAOS*)rid;
×
2047
  } else {
2048
    terrno = code;
×
2049
  }
2050

2051
  return NULL;
×
2052
}
2053

2054
int taos_connect_test(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
×
2055
                      uint16_t port) {
2056
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
×
2057
  if (user == NULL) {
×
2058
    user = TSDB_DEFAULT_USER;
×
2059
  }
2060

2061
  if (pass == NULL) {
×
2062
    pass = TSDB_DEFAULT_PASS;
×
2063
  }
2064

2065
  STscObj* pObj = NULL;
×
2066
  return taos_connect_internal(ip, user, pass, NULL, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
×
2067
}
2068

2069
TAOS* taos_connect_token(const char* ip, const char* token, const char* db, uint16_t port) { return NULL; }
×
2070

2071
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
544✔
2072
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
544✔
2073
  if (user == NULL) {
544✔
2074
    user = TSDB_DEFAULT_USER;
×
2075
  }
2076

2077
  if (auth == NULL) {
544✔
2078
    tscError("No auth info is given, failed to connect to server");
×
2079
    return NULL;
×
2080
  }
2081

2082
  STscObj* pObj = NULL;
544✔
2083
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
544✔
2084
  if (TSDB_CODE_SUCCESS == code) {
544✔
2085
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
131✔
2086
    if (NULL == rid) {
131✔
2087
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2088
    }
2089
    *rid = pObj->id;
131✔
2090
    return (TAOS*)rid;
131✔
2091
  }
2092

2093
  return NULL;
413✔
2094
}
2095

2096
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
2097
//                      const char* db, int dbLen, uint16_t port) {
2098
//   char ipStr[TSDB_EP_LEN] = {0};
2099
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
2100
//   char userStr[TSDB_USER_LEN] = {0};
2101
//   char passStr[TSDB_PASSWORD_LEN] = {0};
2102
//
2103
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
2104
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
2105
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
2106
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
2107
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
2108
// }
2109

2110
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,089,466,248✔
2111
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2112
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2113

2114
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2115
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2116

2117
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2118
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2119
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
1,953,225,022✔
2120

2121
        if (IS_STR_DATA_BLOB(type)) {
1,953,306,919✔
2122
          pResultInfo->length[i] = blobDataLen(pStart);
1,389✔
2123
          pResultInfo->row[i] = blobDataVal(pStart);
×
2124
        } else {
2125
          pResultInfo->length[i] = varDataLen(pStart);
1,953,305,742✔
2126
          pResultInfo->row[i] = varDataVal(pStart);
1,953,292,476✔
2127
        }
2128
      } else {
2129
        pResultInfo->row[i] = NULL;
134,787,257✔
2130
        pResultInfo->length[i] = 0;
134,957,140✔
2131
      }
2132
    } else {
2133
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2134
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2135
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2136
      } else {
2137
        pResultInfo->row[i] = NULL;
474,202,985✔
2138
        pResultInfo->length[i] = 0;
474,991,299✔
2139
      }
2140
    }
2141
  }
2142
}
2,089,862,429✔
2143

2144
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2145
  if (pRequest == NULL) {
×
2146
    return NULL;
×
2147
  }
2148

2149
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2150
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2151
    // All data has returned to App already, no need to try again
2152
    if (pResultInfo->completed) {
×
2153
      pResultInfo->numOfRows = 0;
×
2154
      return NULL;
×
2155
    }
2156

2157
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2158
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2159

2160
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2161
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2162
      pResultInfo->numOfRows = 0;
×
2163
      return NULL;
×
2164
    }
2165

2166
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2167
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2168
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2169
      pResultInfo->numOfRows = 0;
×
2170
      return NULL;
×
2171
    }
2172

2173
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2174
             ", complete:%d, QID:0x%" PRIx64,
2175
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2176

2177
    STscObj*            pTscObj = pRequest->pTscObj;
×
2178
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2179
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2180

2181
    if (pResultInfo->numOfRows == 0) {
×
2182
      return NULL;
×
2183
    }
2184
  }
2185

2186
  if (setupOneRowPtr) {
×
2187
    doSetOneRowPtr(pResultInfo);
×
2188
    pResultInfo->current += 1;
×
2189
  }
2190

2191
  return pResultInfo->row;
×
2192
}
2193

2194
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
87,537,325✔
2195
  tsem_t* sem = param;
87,537,325✔
2196
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
87,537,325✔
2197
    tscError("failed to post sem, code:%s", terrstr());
×
2198
  }
2199
}
87,537,325✔
2200

2201
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1,165,733,169✔
2202
  if (pRequest == NULL) {
1,165,733,169✔
2203
    return NULL;
×
2204
  }
2205

2206
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,165,733,169✔
2207
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
1,165,763,303✔
2208
    // All data has returned to App already, no need to try again
2209
    if (pResultInfo->completed) {
156,106,036✔
2210
      pResultInfo->numOfRows = 0;
68,575,500✔
2211
      return NULL;
68,574,580✔
2212
    }
2213

2214
    // convert ucs4 to native multi-bytes string
2215
    pResultInfo->convertUcs4 = convertUcs4;
87,536,313✔
2216
    tsem_t sem;
86,656,799✔
2217
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
87,536,487✔
2218
      tscError("failed to init sem, code:%s", terrstr());
×
2219
    }
2220
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
87,536,901✔
2221
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
87,537,325✔
2222
      tscError("failed to wait sem, code:%s", terrstr());
×
2223
    }
2224
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
87,537,325✔
2225
      tscError("failed to destroy sem, code:%s", terrstr());
×
2226
    }
2227
    pRequest->inCallback = false;
87,536,923✔
2228
  }
2229

2230
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,097,162,954✔
2231
    return NULL;
6,566,256✔
2232
  } else {
2233
    if (setupOneRowPtr) {
1,090,623,557✔
2234
      doSetOneRowPtr(pResultInfo);
1,011,170,635✔
2235
      pResultInfo->current += 1;
1,011,190,119✔
2236
    }
2237

2238
    return pResultInfo->row;
1,090,639,861✔
2239
  }
2240
}
2241

2242
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
115,736,730✔
2243
  if (pResInfo->row == NULL) {
115,736,730✔
2244
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
101,561,369✔
2245
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
101,561,721✔
2246
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
101,561,261✔
2247
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
101,561,457✔
2248

2249
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
101,561,237✔
2250
      taosMemoryFree(pResInfo->row);
×
2251
      taosMemoryFree(pResInfo->pCol);
×
2252
      taosMemoryFree(pResInfo->length);
×
2253
      taosMemoryFree(pResInfo->convertBuf);
×
2254
      return terrno;
×
2255
    }
2256
  }
2257

2258
  return TSDB_CODE_SUCCESS;
115,736,433✔
2259
}
2260

2261
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
115,506,293✔
2262
  int32_t idx = -1;
115,506,293✔
2263
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
115,506,742✔
2264
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
115,505,765✔
2265

2266
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
669,431,004✔
2267
    int32_t type = pResultInfo->fields[i].type;
553,927,401✔
2268
    int32_t schemaBytes =
2269
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
553,927,295✔
2270

2271
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
553,926,492✔
2272
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
19,102,114✔
2273
      if (p == NULL) {
19,102,114✔
2274
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2275
        return terrno;
×
2276
      }
2277

2278
      pResultInfo->convertBuf[i] = p;
19,102,114✔
2279

2280
      SResultColumn* pCol = &pResultInfo->pCol[i];
19,102,114✔
2281
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2282
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2283
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2284

2285
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2286
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2287
            tscError(
373✔
2288
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2289
                "colLength[i]):%p",
2290
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2291
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
373✔
2292
            return TSDB_CODE_TSC_INTERNAL_ERROR;
74✔
2293
          }
2294

2295
          varDataSetLen(p, len);
2,147,483,647✔
2296
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2297
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2298
        }
2299
      }
2300

2301
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
19,102,040✔
2302
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
19,102,040✔
2303
    }
2304
  }
2305
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
115,506,154✔
2306
  return TSDB_CODE_SUCCESS;
115,506,208✔
2307
}
2308

2309
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
115,505,427✔
2310
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
669,432,197✔
2311
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
553,928,529✔
2312
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
553,926,739✔
2313
    int32_t       type = pFieldE->type;
553,928,741✔
2314
    int32_t       bufLen = 0;
553,927,580✔
2315
    char*         p = NULL;
553,927,580✔
2316
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
553,927,580✔
2317
      continue;
552,329,003✔
2318
    } else {
2319
      bufLen = 64;
1,599,138✔
2320
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
1,599,138✔
2321
      pFieldE->bytes = bufLen;
1,599,138✔
2322
      pField->bytes = bufLen;
1,599,138✔
2323
    }
2324
    if (!p) return terrno;
1,599,138✔
2325
    pResultInfo->convertBuf[i] = p;
1,599,138✔
2326

2327
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
995,716,656✔
2328
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
994,117,518✔
2329
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
994,117,518✔
2330
      p += bufLen;
994,117,518✔
2331
      if (TSDB_CODE_SUCCESS != code) {
994,117,518✔
2332
        return code;
×
2333
      }
2334
    }
2335
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
1,599,138✔
2336
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,599,138✔
2337
  }
2338
  return 0;
115,506,228✔
2339
}
2340

2341
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
386,396✔
2342
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
772,792✔
2343
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
386,396✔
2344
}
2345

2346
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
193,198✔
2347
  char*   p = (char*)pResultInfo->pData;
193,198✔
2348
  int32_t blockVersion = *(int32_t*)p;
193,198✔
2349

2350
  int32_t numOfRows = pResultInfo->numOfRows;
193,198✔
2351
  int32_t numOfCols = pResultInfo->numOfCols;
193,198✔
2352

2353
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2354
  // length |
2355
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
193,198✔
2356
  if (numOfCols != cols) {
193,198✔
2357
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2358
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2359
  }
2360

2361
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
193,198✔
2362
  int32_t* colLength = (int32_t*)(p + len);
193,198✔
2363
  len += sizeof(int32_t) * numOfCols;
193,198✔
2364

2365
  char* pStart = p + len;
193,198✔
2366
  for (int32_t i = 0; i < numOfCols; ++i) {
840,097✔
2367
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
646,899✔
2368

2369
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
646,899✔
2370
      int32_t* offset = (int32_t*)pStart;
229,052✔
2371
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
229,052✔
2372
      len += lenTmp;
229,052✔
2373
      pStart += lenTmp;
229,052✔
2374

2375
      int32_t estimateColLen = 0;
229,052✔
2376
      for (int32_t j = 0; j < numOfRows; ++j) {
1,199,579✔
2377
        if (offset[j] == -1) {
970,527✔
2378
          continue;
48,682✔
2379
        }
2380
        char* data = offset[j] + pStart;
921,845✔
2381

2382
        int32_t jsonInnerType = *data;
921,845✔
2383
        char*   jsonInnerData = data + CHAR_BYTES;
921,845✔
2384
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
921,845✔
2385
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
12,936✔
2386
        } else if (tTagIsJson(data)) {
908,909✔
2387
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
213,142✔
2388
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
695,767✔
2389
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
647,257✔
2390
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,510✔
2391
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
35,574✔
2392
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
12,936✔
2393
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
12,936✔
2394
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
2395
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2396
        } else {
2397
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2398
          return -1;
×
2399
        }
2400
      }
2401
      len += TMAX(colLen, estimateColLen);
229,052✔
2402
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
417,847✔
2403
      int32_t lenTmp = numOfRows * sizeof(int32_t);
53,900✔
2404
      len += (lenTmp + colLen);
53,900✔
2405
      pStart += lenTmp;
53,900✔
2406
    } else {
2407
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
363,947✔
2408
      len += (lenTmp + colLen);
363,947✔
2409
      pStart += lenTmp;
363,947✔
2410
    }
2411
    pStart += colLen;
646,899✔
2412
  }
2413

2414
  // Ensure the complete structure of the block, including the blankfill field,
2415
  // even though it is not used on the client side.
2416
  len += sizeof(bool);
193,198✔
2417
  return len;
193,198✔
2418
}
2419

2420
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
115,735,306✔
2421
  int32_t numOfRows = pResultInfo->numOfRows;
115,735,306✔
2422
  int32_t numOfCols = pResultInfo->numOfCols;
115,736,723✔
2423
  bool    needConvert = false;
115,736,521✔
2424
  for (int32_t i = 0; i < numOfCols; ++i) {
670,774,087✔
2425
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
555,230,139✔
2426
      needConvert = true;
193,198✔
2427
      break;
193,198✔
2428
    }
2429
  }
2430

2431
  if (!needConvert) {
115,737,146✔
2432
    return TSDB_CODE_SUCCESS;
115,543,948✔
2433
  }
2434

2435
  tscDebug("start to convert form json format string");
193,198✔
2436

2437
  char*   p = (char*)pResultInfo->pData;
193,198✔
2438
  int32_t blockVersion = *(int32_t*)p;
193,198✔
2439
  int32_t dataLen = estimateJsonLen(pResultInfo);
193,198✔
2440
  if (dataLen <= 0) {
193,198✔
2441
    tscError("doConvertJson error: estimateJsonLen failed");
×
2442
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2443
  }
2444

2445
  taosMemoryFreeClear(pResultInfo->convertJson);
193,198✔
2446
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
193,198✔
2447
  if (pResultInfo->convertJson == NULL) return terrno;
193,198✔
2448
  char* p1 = pResultInfo->convertJson;
193,198✔
2449

2450
  int32_t totalLen = 0;
193,198✔
2451
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
193,198✔
2452
  if (numOfCols != cols) {
193,198✔
2453
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2454
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2455
  }
2456

2457
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
193,198✔
2458
  (void)memcpy(p1, p, len);
193,198✔
2459

2460
  p += len;
193,198✔
2461
  p1 += len;
193,198✔
2462
  totalLen += len;
193,198✔
2463

2464
  len = sizeof(int32_t) * numOfCols;
193,198✔
2465
  int32_t* colLength = (int32_t*)p;
193,198✔
2466
  int32_t* colLength1 = (int32_t*)p1;
193,198✔
2467
  (void)memcpy(p1, p, len);
193,198✔
2468
  p += len;
193,198✔
2469
  p1 += len;
193,198✔
2470
  totalLen += len;
193,198✔
2471

2472
  char* pStart = p;
193,198✔
2473
  char* pStart1 = p1;
193,198✔
2474
  for (int32_t i = 0; i < numOfCols; ++i) {
840,097✔
2475
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
646,899✔
2476
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
646,899✔
2477
    if (colLen >= dataLen) {
646,899✔
2478
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2479
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2480
    }
2481
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
646,899✔
2482
      int32_t* offset = (int32_t*)pStart;
229,052✔
2483
      int32_t* offset1 = (int32_t*)pStart1;
229,052✔
2484
      len = numOfRows * sizeof(int32_t);
229,052✔
2485
      (void)memcpy(pStart1, pStart, len);
229,052✔
2486
      pStart += len;
229,052✔
2487
      pStart1 += len;
229,052✔
2488
      totalLen += len;
229,052✔
2489

2490
      len = 0;
229,052✔
2491
      for (int32_t j = 0; j < numOfRows; ++j) {
1,199,579✔
2492
        if (offset[j] == -1) {
970,527✔
2493
          continue;
48,682✔
2494
        }
2495
        char* data = offset[j] + pStart;
921,845✔
2496

2497
        int32_t jsonInnerType = *data;
921,845✔
2498
        char*   jsonInnerData = data + CHAR_BYTES;
921,845✔
2499
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
921,845✔
2500
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
921,845✔
2501
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
12,936✔
2502
          varDataSetLen(dst, strlen(varDataVal(dst)));
12,936✔
2503
        } else if (tTagIsJson(data)) {
908,909✔
2504
          char* jsonString = NULL;
213,142✔
2505
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
213,142✔
2506
          if (jsonString == NULL) {
213,142✔
2507
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2508
            return terrno;
×
2509
          }
2510
          STR_TO_VARSTR(dst, jsonString);
213,142✔
2511
          taosMemoryFree(jsonString);
213,142✔
2512
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
695,767✔
2513
          *(char*)varDataVal(dst) = '\"';
647,257✔
2514
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
647,257✔
2515
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(tmp),
647,257✔
2516
                                         pResultInfo->charsetCxt);
2517
          if (length <= 0) {
647,257✔
2518
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
539✔
2519
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2520
            length = 0;
539✔
2521
          }
2522
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,
647,257✔
2523
                                                 varDataVal(tmp), length);
2524
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
647,257✔
2525
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
647,257✔
2526
          tscError("value:%s.", varDataVal(dst));
647,257✔
2527
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,510✔
2528
          double jsonVd = *(double*)(jsonInnerData);
35,574✔
2529
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
35,574✔
2530
          varDataSetLen(dst, strlen(varDataVal(dst)));
35,574✔
2531
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
12,936✔
2532
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
12,936✔
2533
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
12,936✔
2534
          varDataSetLen(dst, strlen(varDataVal(dst)));
12,936✔
2535
        } else {
2536
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2537
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2538
        }
2539

2540
        offset1[j] = len;
921,845✔
2541
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
921,845✔
2542
        len += varDataTLen(dst);
921,845✔
2543
      }
2544
      colLen1 = len;
229,052✔
2545
      totalLen += colLen1;
229,052✔
2546
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
229,052✔
2547
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
417,847✔
2548
      len = numOfRows * sizeof(int32_t);
53,900✔
2549
      (void)memcpy(pStart1, pStart, len);
53,900✔
2550
      pStart += len;
53,900✔
2551
      pStart1 += len;
53,900✔
2552
      totalLen += len;
53,900✔
2553
      totalLen += colLen;
53,900✔
2554
      (void)memcpy(pStart1, pStart, colLen);
53,900✔
2555
    } else {
2556
      len = BitmapLen(pResultInfo->numOfRows);
363,947✔
2557
      (void)memcpy(pStart1, pStart, len);
363,947✔
2558
      pStart += len;
363,947✔
2559
      pStart1 += len;
363,947✔
2560
      totalLen += len;
363,947✔
2561
      totalLen += colLen;
363,947✔
2562
      (void)memcpy(pStart1, pStart, colLen);
363,947✔
2563
    }
2564
    pStart += colLen;
646,899✔
2565
    pStart1 += colLen1;
646,899✔
2566
  }
2567

2568
  // Ensure the complete structure of the block, including the blankfill field,
2569
  // even though it is not used on the client side.
2570
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2571
  totalLen += sizeof(bool);
193,198✔
2572

2573
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
193,198✔
2574
  pResultInfo->pData = pResultInfo->convertJson;
193,198✔
2575
  return TSDB_CODE_SUCCESS;
193,198✔
2576
}
2577

2578
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
122,455,873✔
2579
  bool convertForDecimal = convertUcs4;
122,455,873✔
2580
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
122,455,873✔
2581
    tscError("setResultDataPtr paras error");
×
2582
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2583
  }
2584

2585
  if (pResultInfo->numOfRows == 0) {
122,456,049✔
2586
    return TSDB_CODE_SUCCESS;
6,719,231✔
2587
  }
2588

2589
  if (pResultInfo->pData == NULL) {
115,736,818✔
2590
    tscError("setResultDataPtr error: pData is NULL");
×
2591
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2592
  }
2593

2594
  int32_t code = doPrepareResPtr(pResultInfo);
115,736,250✔
2595
  if (code != TSDB_CODE_SUCCESS) {
115,736,450✔
2596
    return code;
×
2597
  }
2598
  code = doConvertJson(pResultInfo);
115,736,450✔
2599
  if (code != TSDB_CODE_SUCCESS) {
115,735,709✔
2600
    return code;
×
2601
  }
2602

2603
  char* p = (char*)pResultInfo->pData;
115,735,709✔
2604

2605
  // version:
2606
  int32_t blockVersion = *(int32_t*)p;
115,735,709✔
2607
  p += sizeof(int32_t);
115,735,797✔
2608

2609
  int32_t dataLen = *(int32_t*)p;
115,735,993✔
2610
  p += sizeof(int32_t);
115,735,905✔
2611

2612
  int32_t rows = *(int32_t*)p;
115,735,729✔
2613
  p += sizeof(int32_t);
115,736,081✔
2614

2615
  int32_t cols = *(int32_t*)p;
115,736,081✔
2616
  p += sizeof(int32_t);
115,736,257✔
2617

2618
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
115,736,266✔
2619
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
176✔
2620
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2621
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2622
  }
2623

2624
  int32_t hasColumnSeg = *(int32_t*)p;
115,736,246✔
2625
  p += sizeof(int32_t);
115,736,061✔
2626

2627
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
115,735,885✔
2628
  p += sizeof(uint64_t);
115,735,885✔
2629

2630
  // check fields
2631
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
671,016,976✔
2632
    int8_t type = *(int8_t*)p;
555,282,760✔
2633
    p += sizeof(int8_t);
555,281,574✔
2634

2635
    int32_t bytes = *(int32_t*)p;
555,281,750✔
2636
    p += sizeof(int32_t);
555,283,013✔
2637

2638
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
555,282,375✔
2639
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
314,438✔
2640
    }
2641
  }
2642

2643
  int32_t* colLength = (int32_t*)p;
115,736,918✔
2644
  p += sizeof(int32_t) * pResultInfo->numOfCols;
115,736,918✔
2645

2646
  char* pStart = p;
115,736,918✔
2647
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
671,023,268✔
2648
    if ((pStart - pResultInfo->pData) >= dataLen) {
555,285,357✔
2649
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2650
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2651
    }
2652
    if (blockVersion == BLOCK_VERSION_1) {
555,285,718✔
2653
      colLength[i] = htonl(colLength[i]);
421,447,811✔
2654
    }
2655
    if (colLength[i] >= dataLen) {
555,285,292✔
2656
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2657
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2658
    }
2659
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
555,285,644✔
2660
      tscError("invalid type %d", pResultInfo->fields[i].type);
410✔
2661
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2662
    }
2663
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
555,285,509✔
2664
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
137,169,031✔
2665
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
137,168,848✔
2666
    } else {
2667
      pResultInfo->pCol[i].nullbitmap = pStart;
418,117,279✔
2668
      pStart += BitmapLen(pResultInfo->numOfRows);
418,116,860✔
2669
    }
2670

2671
    pResultInfo->pCol[i].pData = pStart;
555,284,925✔
2672
    pResultInfo->length[i] =
1,110,572,292✔
2673
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
1,105,075,062✔
2674
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
555,286,920✔
2675

2676
    pStart += colLength[i];
555,286,647✔
2677
  }
2678

2679
  p = pStart;
115,737,146✔
2680
  // bool blankFill = *(bool*)p;
2681
  p += sizeof(bool);
115,737,146✔
2682
  int32_t offset = p - pResultInfo->pData;
115,737,146✔
2683
  if (offset > dataLen) {
115,736,697✔
2684
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2685
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2686
  }
2687

2688
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2689
  if (convertUcs4) {
115,736,697✔
2690
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
115,506,293✔
2691
  }
2692
#endif
2693
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
115,736,686✔
2694
    code = convertDecimalType(pResultInfo);
115,506,208✔
2695
  }
2696
  return code;
115,736,794✔
2697
}
2698

2699
char* getDbOfConnection(STscObj* pObj) {
638,164,921✔
2700
  terrno = TSDB_CODE_SUCCESS;
638,164,921✔
2701
  char* p = NULL;
638,167,345✔
2702
  (void)taosThreadMutexLock(&pObj->mutex);
638,167,345✔
2703
  size_t len = strlen(pObj->db);
638,170,236✔
2704
  if (len > 0) {
638,170,491✔
2705
    p = taosStrndup(pObj->db, tListLen(pObj->db));
572,053,595✔
2706
    if (p == NULL) {
572,046,856✔
2707
      tscError("failed to taosStrndup db name");
×
2708
    }
2709
  }
2710

2711
  (void)taosThreadMutexUnlock(&pObj->mutex);
638,163,752✔
2712
  return p;
638,163,233✔
2713
}
2714

2715
void setConnectionDB(STscObj* pTscObj, const char* db) {
2,451,533✔
2716
  if (db == NULL || pTscObj == NULL) {
2,451,533✔
2717
    tscError("setConnectionDB para is NULL");
×
2718
    return;
×
2719
  }
2720

2721
  (void)taosThreadMutexLock(&pTscObj->mutex);
2,451,533✔
2722
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
2,451,533✔
2723
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
2,451,533✔
2724
}
2725

2726
void resetConnectDB(STscObj* pTscObj) {
×
2727
  if (pTscObj == NULL) {
×
2728
    return;
×
2729
  }
2730

2731
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2732
  pTscObj->db[0] = 0;
×
2733
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2734
}
2735

2736
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
92,383,753✔
2737
                              bool isStmt) {
2738
  if (pResultInfo == NULL || pRsp == NULL) {
92,383,753✔
2739
    tscError("setQueryResultFromRsp paras is null");
×
2740
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2741
  }
2742

2743
  taosMemoryFreeClear(pResultInfo->pRspMsg);
92,383,753✔
2744
  pResultInfo->pRspMsg = (const char*)pRsp;
92,383,753✔
2745
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
92,383,557✔
2746
  pResultInfo->current = 0;
92,383,753✔
2747
  pResultInfo->completed = (pRsp->completed == 1);
92,383,753✔
2748
  pResultInfo->precision = pRsp->precision;
92,383,753✔
2749

2750
  // decompress data if needed
2751
  int32_t payloadLen = htonl(pRsp->payloadLen);
92,383,557✔
2752

2753
  if (pRsp->compressed) {
92,383,753✔
2754
    if (pResultInfo->decompBuf == NULL) {
×
2755
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
2756
      if (pResultInfo->decompBuf == NULL) {
×
2757
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2758
        return terrno;
×
2759
      }
2760
      pResultInfo->decompBufSize = payloadLen;
×
2761
    } else {
2762
      if (pResultInfo->decompBufSize < payloadLen) {
×
2763
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
2764
        if (p == NULL) {
×
2765
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2766
          return terrno;
×
2767
        }
2768

2769
        pResultInfo->decompBuf = p;
×
2770
        pResultInfo->decompBufSize = payloadLen;
×
2771
      }
2772
    }
2773
  }
2774

2775
  if (payloadLen > 0) {
92,383,753✔
2776
    int32_t compLen = *(int32_t*)pRsp->data;
85,664,904✔
2777
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
85,664,904✔
2778

2779
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
85,664,904✔
2780

2781
    if (pRsp->compressed && compLen < rawLen) {
85,664,708✔
2782
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
2783
      if (len < 0) {
×
2784
        tscError("tsDecompressString failed");
×
2785
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2786
      }
2787
      if (len != rawLen) {
×
2788
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2789
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2790
      }
2791
      pResultInfo->pData = pResultInfo->decompBuf;
×
2792
      pResultInfo->payloadLen = rawLen;
×
2793
    } else {
2794
      pResultInfo->pData = pStart;
85,664,904✔
2795
      pResultInfo->payloadLen = htonl(pRsp->compLen);
85,664,904✔
2796
      if (pRsp->compLen != pRsp->payloadLen) {
85,664,904✔
2797
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2798
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2799
      }
2800
    }
2801
  }
2802

2803
  // TODO handle the compressed case
2804
  pResultInfo->totalRows += pResultInfo->numOfRows;
92,383,753✔
2805

2806
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
92,383,557✔
2807
  return code;
92,383,753✔
2808
}
2809

2810
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
682✔
2811
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
682✔
2812
  void*              clientRpc = NULL;
682✔
2813
  SServerStatusRsp   statusRsp = {0};
682✔
2814
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
682✔
2815
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
682✔
2816
  SRpcMsg  rpcRsp = {0};
682✔
2817
  SRpcInit rpcInit = {0};
682✔
2818
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
682✔
2819

2820
  rpcInit.label = "CHK";
682✔
2821
  rpcInit.numOfThreads = 1;
682✔
2822
  rpcInit.cfp = NULL;
682✔
2823
  rpcInit.sessions = 16;
682✔
2824
  rpcInit.connType = TAOS_CONN_CLIENT;
682✔
2825
  rpcInit.idleTime = tsShellActivityTimer * 1000;
682✔
2826
  rpcInit.compressSize = tsCompressMsgSize;
682✔
2827
  rpcInit.user = "_dnd";
682✔
2828

2829
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
682✔
2830
  connLimitNum = TMAX(connLimitNum, 10);
682✔
2831
  connLimitNum = TMIN(connLimitNum, 500);
682✔
2832
  rpcInit.connLimitNum = connLimitNum;
682✔
2833
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
682✔
2834
  rpcInit.readTimeout = tsReadTimeout;
682✔
2835
  rpcInit.ipv6 = tsEnableIpv6;
682✔
2836
  rpcInit.enableSSL = tsEnableTLS;
682✔
2837

2838
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
682✔
2839
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
682✔
2840
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
682✔
2841
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
682✔
2842
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
682✔
2843

2844
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
682✔
2845
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2846
    goto _OVER;
×
2847
  }
2848

2849
  clientRpc = rpcOpen(&rpcInit);
682✔
2850
  if (clientRpc == NULL) {
682✔
2851
    code = terrno;
×
2852
    tscError("failed to init server status client since %s", tstrerror(code));
×
2853
    goto _OVER;
×
2854
  }
2855

2856
  if (fqdn == NULL) {
682✔
2857
    fqdn = tsLocalFqdn;
682✔
2858
  }
2859

2860
  if (port == 0) {
682✔
2861
    port = tsServerPort;
682✔
2862
  }
2863

2864
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
682✔
2865
  epSet.eps[0].port = (uint16_t)port;
682✔
2866
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
682✔
2867
  if (TSDB_CODE_SUCCESS != ret) {
682✔
2868
    tscError("failed to send recv since %s", tstrerror(ret));
×
2869
    goto _OVER;
×
2870
  }
2871

2872
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
682✔
2873
    tscError("failed to send server status req since %s", terrstr());
128✔
2874
    goto _OVER;
128✔
2875
  }
2876

2877
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
554✔
2878
    tscError("failed to parse server status rsp since %s", terrstr());
×
2879
    goto _OVER;
×
2880
  }
2881

2882
  code = statusRsp.statusCode;
554✔
2883
  if (details != NULL) {
554✔
2884
    tstrncpy(details, statusRsp.details, maxlen);
554✔
2885
  }
2886

2887
_OVER:
649✔
2888
  if (clientRpc != NULL) {
682✔
2889
    rpcClose(clientRpc);
682✔
2890
  }
2891
  if (rpcRsp.pCont != NULL) {
682✔
2892
    rpcFreeCont(rpcRsp.pCont);
554✔
2893
  }
2894
  return code;
682✔
2895
}
2896

2897
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
1,250✔
2898
                      int32_t acctId, char* db) {
2899
  SName name = {0};
1,250✔
2900

2901
  if (len1 <= 0) {
1,250✔
2902
    return -1;
×
2903
  }
2904

2905
  const char* dbName = db;
1,250✔
2906
  const char* tbName = NULL;
1,250✔
2907
  int32_t     dbLen = 0;
1,250✔
2908
  int32_t     tbLen = 0;
1,250✔
2909
  if (len2 > 0) {
1,250✔
2910
    dbName = str + pos1;
×
2911
    dbLen = len1;
×
2912
    tbName = str + pos2;
×
2913
    tbLen = len2;
×
2914
  } else {
2915
    dbLen = strlen(db);
1,250✔
2916
    tbName = str + pos1;
1,250✔
2917
    tbLen = len1;
1,250✔
2918
  }
2919

2920
  if (dbLen <= 0 || tbLen <= 0) {
1,250✔
2921
    return -1;
×
2922
  }
2923

2924
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
1,250✔
2925
    return -1;
×
2926
  }
2927

2928
  if (tNameAddTbName(&name, tbName, tbLen)) {
1,250✔
2929
    return -1;
×
2930
  }
2931

2932
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,250✔
2933
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
1,250✔
2934

2935
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
1,250✔
2936
  if (pDb) {
1,250✔
2937
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2938
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2939
    }
2940
  } else {
2941
    STablesReq db;
1,250✔
2942
    db.pTables = taosArrayInit(20, sizeof(SName));
1,250✔
2943
    if (NULL == db.pTables) {
1,250✔
2944
      return terrno;
×
2945
    }
2946
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
1,250✔
2947
    if (NULL == taosArrayPush(db.pTables, &name)) {
2,500✔
2948
      return terrno;
×
2949
    }
2950
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
1,250✔
2951
  }
2952

2953
  return TSDB_CODE_SUCCESS;
1,250✔
2954
}
2955

2956
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
1,250✔
2957
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,250✔
2958
  if (NULL == pHash) {
1,250✔
2959
    return terrno;
×
2960
  }
2961

2962
  bool    inEscape = false;
1,250✔
2963
  int32_t code = 0;
1,250✔
2964
  void*   pIter = NULL;
1,250✔
2965

2966
  int32_t vIdx = 0;
1,250✔
2967
  int32_t vPos[2];
1,250✔
2968
  int32_t vLen[2];
1,250✔
2969

2970
  (void)memset(vPos, -1, sizeof(vPos));
1,250✔
2971
  (void)memset(vLen, 0, sizeof(vLen));
1,250✔
2972

2973
  for (int32_t i = 0;; ++i) {
6,250✔
2974
    if (0 == *(tbList + i)) {
6,250✔
2975
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
1,250✔
2976
        vLen[vIdx] = i - vPos[vIdx];
1,250✔
2977
      }
2978

2979
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
1,250✔
2980
      if (code) {
1,250✔
2981
        goto _return;
×
2982
      }
2983

2984
      break;
1,250✔
2985
    }
2986

2987
    if ('`' == *(tbList + i)) {
5,000✔
2988
      inEscape = !inEscape;
×
2989
      if (!inEscape) {
×
2990
        if (vPos[vIdx] >= 0) {
×
2991
          vLen[vIdx] = i - vPos[vIdx];
×
2992
        } else {
2993
          goto _return;
×
2994
        }
2995
      }
2996

2997
      continue;
×
2998
    }
2999

3000
    if (inEscape) {
5,000✔
3001
      if (vPos[vIdx] < 0) {
×
3002
        vPos[vIdx] = i;
×
3003
      }
3004
      continue;
×
3005
    }
3006

3007
    if ('.' == *(tbList + i)) {
5,000✔
3008
      if (vPos[vIdx] < 0) {
×
3009
        goto _return;
×
3010
      }
3011
      if (vLen[vIdx] <= 0) {
×
3012
        vLen[vIdx] = i - vPos[vIdx];
×
3013
      }
3014
      vIdx++;
×
3015
      if (vIdx >= 2) {
×
3016
        goto _return;
×
3017
      }
3018
      continue;
×
3019
    }
3020

3021
    if (',' == *(tbList + i)) {
5,000✔
3022
      if (vPos[vIdx] < 0) {
×
3023
        goto _return;
×
3024
      }
3025
      if (vLen[vIdx] <= 0) {
×
3026
        vLen[vIdx] = i - vPos[vIdx];
×
3027
      }
3028

3029
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
3030
      if (code) {
×
3031
        goto _return;
×
3032
      }
3033

3034
      (void)memset(vPos, -1, sizeof(vPos));
×
3035
      (void)memset(vLen, 0, sizeof(vLen));
×
3036
      vIdx = 0;
×
3037
      continue;
×
3038
    }
3039

3040
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
5,000✔
3041
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
3042
        vLen[vIdx] = i - vPos[vIdx];
×
3043
      }
3044
      continue;
×
3045
    }
3046

3047
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
5,000✔
3048
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
625✔
3049
      if (vLen[vIdx] > 0) {
5,000✔
3050
        goto _return;
×
3051
      }
3052
      if (vPos[vIdx] < 0) {
5,000✔
3053
        vPos[vIdx] = i;
1,250✔
3054
      }
3055
      continue;
5,000✔
3056
    }
3057

3058
    goto _return;
×
3059
  }
3060

3061
  int32_t dbNum = taosHashGetSize(pHash);
1,250✔
3062
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
1,250✔
3063
  if (NULL == pReq) {
1,250✔
3064
    TSC_ERR_JRET(terrno);
×
3065
  }
3066
  pIter = taosHashIterate(pHash, NULL);
1,250✔
3067
  while (pIter) {
2,500✔
3068
    STablesReq* pDb = (STablesReq*)pIter;
1,250✔
3069
    if (NULL == taosArrayPush(*pReq, pDb)) {
2,500✔
3070
      TSC_ERR_JRET(terrno);
×
3071
    }
3072
    pIter = taosHashIterate(pHash, pIter);
1,250✔
3073
  }
3074

3075
  taosHashCleanup(pHash);
1,250✔
3076

3077
  return TSDB_CODE_SUCCESS;
1,250✔
3078

3079
_return:
×
3080

3081
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3082

3083
  pIter = taosHashIterate(pHash, NULL);
×
3084
  while (pIter) {
×
3085
    STablesReq* pDb = (STablesReq*)pIter;
×
3086
    taosArrayDestroy(pDb->pTables);
×
3087
    pIter = taosHashIterate(pHash, pIter);
×
3088
  }
3089

3090
  taosHashCleanup(pHash);
×
3091

3092
  return terrno;
×
3093
}
3094

3095
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
1,250✔
3096
  SSyncQueryParam* pParam = param;
1,250✔
3097
  pParam->pRequest->code = code;
1,250✔
3098

3099
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,250✔
3100
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3101
  }
3102
}
1,250✔
3103

3104
void syncQueryFn(void* param, void* res, int32_t code) {
631,116,249✔
3105
  SSyncQueryParam* pParam = param;
631,116,249✔
3106
  pParam->pRequest = res;
631,116,249✔
3107

3108
  if (pParam->pRequest) {
631,120,296✔
3109
    pParam->pRequest->code = code;
631,098,924✔
3110
    clientOperateReport(pParam->pRequest);
631,103,847✔
3111
  }
3112

3113
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
631,109,818✔
3114
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3115
  }
3116
}
631,127,338✔
3117

3118
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
630,593,890✔
3119
                        int8_t source) {
3120
  if (sql == NULL || NULL == fp) {
630,593,890✔
3121
    terrno = TSDB_CODE_INVALID_PARA;
567✔
3122
    if (fp) {
×
3123
      fp(param, NULL, terrno);
×
3124
    }
3125

3126
    return;
×
3127
  }
3128

3129
  size_t sqlLen = strlen(sql);
630,593,906✔
3130
  if (sqlLen > (size_t)tsMaxSQLLength) {
630,593,906✔
3131
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
1,248✔
3132
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1,248✔
3133
    fp(param, NULL, terrno);
1,248✔
3134
    return;
1,248✔
3135
  }
3136

3137
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
630,592,658✔
3138

3139
  SRequestObj* pRequest = NULL;
630,593,737✔
3140
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
630,603,752✔
3141
  if (code != TSDB_CODE_SUCCESS) {
630,603,211✔
3142
    terrno = code;
×
3143
    fp(param, NULL, terrno);
×
3144
    return;
×
3145
  }
3146

3147
  SSessParam para = {.type = SESSION_MAX_CONCURRENCY, .value = 1};
630,603,211✔
3148
  code = connUpdateSessMgtMetric(connId, &para);
630,600,247✔
3149
  if (code != TSDB_CODE_SUCCESS) {
630,611,168✔
3150
    terrno = code;
×
3151
    fp(param, NULL, terrno);
×
3152
    return;
×
3153
  }
3154

3155
  pRequest->source = source;
630,611,168✔
3156
  pRequest->body.queryFp = fp;
630,611,168✔
3157
  doAsyncQuery(pRequest, false);
630,611,168✔
3158
}
3159

3160
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
792✔
3161
                                 int64_t reqid) {
3162
  if (sql == NULL || NULL == fp) {
792✔
3163
    terrno = TSDB_CODE_INVALID_PARA;
×
3164
    if (fp) {
×
3165
      fp(param, NULL, terrno);
×
3166
    }
3167

3168
    return;
×
3169
  }
3170

3171
  size_t sqlLen = strlen(sql);
792✔
3172
  if (sqlLen > (size_t)tsMaxSQLLength) {
792✔
3173
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
3174
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3175
    fp(param, NULL, terrno);
×
3176
    return;
×
3177
  }
3178

3179
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
792✔
3180

3181
  SRequestObj* pRequest = NULL;
792✔
3182
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
792✔
3183
  if (code != TSDB_CODE_SUCCESS) {
792✔
3184
    terrno = code;
×
3185
    fp(param, NULL, terrno);
×
3186
    return;
×
3187
  }
3188

3189
  SSessParam para = {.type = SESSION_MAX_CONCURRENCY, .value = 1};
792✔
3190
  code = connUpdateSessMgtMetric(connId, &para);
792✔
3191
  if (code != TSDB_CODE_SUCCESS) {
792✔
3192
    terrno = code;
×
3193
    fp(param, NULL, terrno);
×
3194
    return;
×
3195
  }
3196

3197
  pRequest->body.queryFp = fp;
792✔
3198

3199
  doAsyncQuery(pRequest, false);
792✔
3200
}
3201

3202
int32_t connUpdateSessMgtMetric(int64_t connId, SSessParam* pParam) {
630,602,268✔
3203
  int32_t code = 0;
630,602,268✔
3204

3205
  STscObj* pTscObj = acquireTscObj(connId);
630,602,268✔
3206
  if (pTscObj == NULL) {
630,610,893✔
3207
    code = TSDB_CODE_INVALID_PARA;
×
3208
    return code;
×
3209
  }
3210
  code = sessMgtUpdateUserMetric(pTscObj->user, pParam);
630,610,893✔
3211

3212
  releaseTscObj(connId);
630,611,960✔
3213
  return code;
630,611,960✔
3214
}
3215

3216
int32_t tscUpdateSessMgtMetric(STscObj* pTscObj, SSessParam* pParam) {
641,554,941✔
3217
  int32_t code = 0;
641,554,941✔
3218

3219
  if (pTscObj == NULL) {
641,554,941✔
3220
    code = TSDB_CODE_INVALID_PARA;
×
3221
    return code;
×
3222
  }
3223
  code = sessMgtUpdateUserMetric(pTscObj->user, pParam);
641,554,941✔
3224

3225
  return code;
641,568,658✔
3226
}
3227

3228
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
630,567,371✔
3229
  if (NULL == taos) {
630,567,371✔
3230
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3231
    return NULL;
×
3232
  }
3233

3234
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
630,567,371✔
3235
  if (NULL == param) {
630,565,842✔
3236
    return NULL;
×
3237
  }
3238

3239
  int32_t code = tsem_init(&param->sem, 0, 0);
630,565,842✔
3240
  if (TSDB_CODE_SUCCESS != code) {
630,552,712✔
3241
    taosMemoryFree(param);
×
3242
    return NULL;
×
3243
  }
3244

3245
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
630,552,712✔
3246
  code = tsem_wait(&param->sem);
630,563,937✔
3247
  if (TSDB_CODE_SUCCESS != code) {
630,569,930✔
3248
    taosMemoryFree(param);
×
3249
    return NULL;
×
3250
  }
3251
  code = tsem_destroy(&param->sem);
630,569,930✔
3252
  if (TSDB_CODE_SUCCESS != code) {
630,572,255✔
3253
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3254
  }
3255

3256
  SRequestObj* pRequest = NULL;
630,572,255✔
3257
  if (param->pRequest != NULL) {
630,572,255✔
3258
    param->pRequest->syncQuery = true;
630,570,963✔
3259
    pRequest = param->pRequest;
630,569,678✔
3260
    param->pRequest->inCallback = false;
630,570,910✔
3261
  }
3262
  taosMemoryFree(param);
630,570,869✔
3263

3264
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3265
  //          *(int64_t*)taos, pRequest);
3266

3267
  return pRequest;
630,568,769✔
3268
}
3269

3270
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
792✔
3271
  if (NULL == taos) {
792✔
3272
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3273
    return NULL;
×
3274
  }
3275

3276
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
792✔
3277
  if (param == NULL) {
792✔
3278
    return NULL;
×
3279
  }
3280
  int32_t code = tsem_init(&param->sem, 0, 0);
792✔
3281
  if (TSDB_CODE_SUCCESS != code) {
792✔
3282
    taosMemoryFree(param);
×
3283
    return NULL;
×
3284
  }
3285

3286
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
792✔
3287
  code = tsem_wait(&param->sem);
792✔
3288
  if (TSDB_CODE_SUCCESS != code) {
792✔
3289
    taosMemoryFree(param);
×
3290
    return NULL;
×
3291
  }
3292
  SRequestObj* pRequest = NULL;
792✔
3293
  if (param->pRequest != NULL) {
792✔
3294
    param->pRequest->syncQuery = true;
792✔
3295
    pRequest = param->pRequest;
792✔
3296
  }
3297
  taosMemoryFree(param);
792✔
3298

3299
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3300
  //   *(int64_t*)taos, pRequest);
3301

3302
  return pRequest;
792✔
3303
}
3304

3305
static void fetchCallback(void* pResult, void* param, int32_t code) {
89,541,303✔
3306
  SRequestObj* pRequest = (SRequestObj*)param;
89,541,303✔
3307

3308
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
89,541,303✔
3309

3310
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
89,541,303✔
3311
           tstrerror(code), pRequest->requestId);
3312

3313
  pResultInfo->pData = pResult;
89,541,303✔
3314
  pResultInfo->numOfRows = 0;
89,541,303✔
3315

3316
  if (code != TSDB_CODE_SUCCESS) {
89,541,107✔
3317
    pRequest->code = code;
×
3318
    taosMemoryFreeClear(pResultInfo->pData);
×
3319
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3320
    return;
×
3321
  }
3322

3323
  if (pRequest->code != TSDB_CODE_SUCCESS) {
89,541,107✔
3324
    taosMemoryFreeClear(pResultInfo->pData);
×
3325
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3326
    return;
×
3327
  }
3328

3329
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
90,423,299✔
3330
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
89,541,303✔
3331
  if (pRequest->code != TSDB_CODE_SUCCESS) {
89,541,303✔
3332
    pResultInfo->numOfRows = 0;
74✔
3333
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
74✔
3334
             tstrerror(pRequest->code), pRequest->requestId);
3335
  } else {
3336
    tscDebug(
89,541,001✔
3337
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3338
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3339

3340
    STscObj*            pTscObj = pRequest->pTscObj;
89,541,001✔
3341
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
89,541,229✔
3342
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
89,541,229✔
3343
  }
3344

3345
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
89,540,879✔
3346
}
3347

3348
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
92,896,759✔
3349
  pRequest->body.fetchFp = fp;
92,896,759✔
3350
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
92,896,759✔
3351

3352
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
92,896,759✔
3353

3354
  // this query has no results or error exists, return directly
3355
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
92,896,531✔
3356
    pResultInfo->numOfRows = 0;
×
3357
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3358
    return;
685✔
3359
  }
3360

3361
  // all data has returned to App already, no need to try again
3362
  if (pResultInfo->completed) {
92,896,536✔
3363
    // it is a local executed query, no need to do async fetch
3364
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
3,355,456✔
3365
      if (pResultInfo->localResultFetched) {
1,533,636✔
3366
        pResultInfo->numOfRows = 0;
766,818✔
3367
        pResultInfo->current = 0;
766,818✔
3368
      } else {
3369
        pResultInfo->localResultFetched = true;
766,818✔
3370
      }
3371
    } else {
3372
      pResultInfo->numOfRows = 0;
1,821,820✔
3373
    }
3374

3375
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
3,355,456✔
3376
    return;
3,355,456✔
3377
  }
3378

3379
  SSchedulerReq req = {
89,541,303✔
3380
      .syncReq = false,
3381
      .fetchFp = fetchCallback,
3382
      .cbParam = pRequest,
3383
  };
3384

3385
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
89,541,303✔
3386
  if (TSDB_CODE_SUCCESS != code) {
89,540,619✔
3387
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3388
    // pRequest->body.fetchFp(param, pRequest, code);
3389
  }
3390
}
3391

3392
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
631,093,139✔
3393
  pRequest->inCallback = true;
631,093,139✔
3394
  int64_t this = pRequest->self;
631,100,798✔
3395
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
631,079,592✔
3396
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
82,200✔
3397
    code = TSDB_CODE_SUCCESS;
×
3398
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3399
  }
3400

3401
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
631,079,592✔
3402
           pRequest);
3403

3404
  if (pRequest->body.queryFp != NULL) {
631,080,049✔
3405
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
631,093,433✔
3406
  }
3407

3408

3409
  SRequestObj* pReq = acquireRequest(this);
631,104,043✔
3410
  if (pReq != NULL) {
631,105,214✔
3411
    pReq->inCallback = false;
630,270,150✔
3412
    (void)releaseRequest(this);
630,270,314✔
3413
  }
3414
}
631,098,699✔
3415

3416
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
552,102✔
3417
                       SParseSqlRes* pRes) {
3418
#ifndef TD_ENTERPRISE
3419
  return TSDB_CODE_SUCCESS;
3420
#else
3421
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
552,102✔
3422
#endif
3423
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc