• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4896

24 Dec 2025 07:36AM UTC coverage: 65.929% (+0.4%) from 65.513%
#4896

push

travis-ci

web-flow
enh: [TS-7591] Some code refactor and add more log. (#34022)

326 of 537 new or added lines in 4 files covered. (60.71%)

370 existing lines in 111 files now uncovered.

185828 of 281861 relevant lines covered (65.93%)

116309824.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.32
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "command.h"
22
#include "decimal.h"
23
#include "scheduler.h"
24
#include "tdatablock.h"
25
#include "tdataformat.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tmisce.h"
29
#include "tmsg.h"
30
#include "tmsgtype.h"
31
#include "tpagedbuf.h"
32
#include "tref.h"
33
#include "tsched.h"
34
#include "tversion.h"
35

36
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
37
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
38

39
void setQueryRequest(int64_t rId) {
439,175,034✔
40
  SRequestObj* pReq = acquireRequest(rId);
439,175,034✔
41
  if (pReq != NULL) {
439,176,178✔
42
    pReq->isQuery = true;
439,164,135✔
43
    (void)releaseRequest(rId);
439,163,217✔
44
  }
45
}
439,175,260✔
46

47
static bool stringLengthCheck(const char* str, size_t maxsize) {
6,816,878✔
48
  if (str == NULL) {
6,816,878✔
49
    return false;
×
50
  }
51

52
  size_t len = strlen(str);
6,816,878✔
53
  if (len <= 0 || len > maxsize) {
6,816,878✔
54
    return false;
30✔
55
  }
56

57
  return true;
6,816,848✔
58
}
59

60
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
2,805,448✔
61

62
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
2,804,851✔
63

64
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
1,206,467✔
65

66
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
2,803,931✔
67
  char key[512] = {0};
2,803,931✔
68
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
2,804,338✔
69
  return taosStrdup(key);
2,804,338✔
70
}
71

72
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
661,966✔
73
  if (dst == NULL || src == NULL || srcLength == 0) {
661,966✔
74
    return 0;
544✔
75
  }
76

77
  size_t escapeLength = 0;
661,422✔
78
  for (size_t i = 0; i < srcLength; ++i) {
18,747,192✔
79
    if (src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' || src[i] == '\r' ||
18,085,770✔
80
        src[i] == '\t') {
18,085,770✔
81
      escapeLength += 1;
×
82
    }
83
  }
84

85
  size_t dstLength = srcLength;
661,422✔
86
  if (escapeLength == 0) {
661,422✔
87
    (void)memcpy(dst, src, srcLength);
661,422✔
88
  } else {
89
    dstLength = 0;
×
90
    for (size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
91
      switch (src[i]) {
×
92
        case '\"':
×
93
          dst[dstLength++] = '\\';
×
94
          dst[dstLength++] = '\"';
×
95
          break;
×
96
        case '\\':
×
97
          dst[dstLength++] = '\\';
×
98
          dst[dstLength++] = '\\';
×
99
          break;
×
100
        case '\b':
×
101
          dst[dstLength++] = '\\';
×
102
          dst[dstLength++] = 'b';
×
103
          break;
×
104
        case '\f':
×
105
          dst[dstLength++] = '\\';
×
106
          dst[dstLength++] = 'f';
×
107
          break;
×
108
        case '\n':
×
109
          dst[dstLength++] = '\\';
×
110
          dst[dstLength++] = 'n';
×
111
          break;
×
112
        case '\r':
×
113
          dst[dstLength++] = '\\';
×
114
          dst[dstLength++] = 'r';
×
115
          break;
×
116
        case '\t':
×
117
          dst[dstLength++] = '\\';
×
118
          dst[dstLength++] = 't';
×
119
          break;
×
120
        default:
×
121
          dst[dstLength++] = src[i];
×
122
      }
123
    }
124
  }
125

126
  return dstLength;
661,422✔
127
}
128

129
bool chkRequestKilled(void* param) {
2,147,483,647✔
130
  bool         killed = false;
2,147,483,647✔
131
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
132
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
133
    killed = true;
×
134
  }
135

136
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
137

138
  return killed;
2,147,483,647✔
139
}
140

141
void cleanupAppInfo() {
1,251,638✔
142
  taosHashCleanup(appInfo.pInstMap);
1,251,638✔
143
  taosHashCleanup(appInfo.pInstMapByClusterId);
1,251,638✔
144
  tscInfo("cluster instance map cleaned");
1,251,638✔
145
}
1,251,638✔
146

147
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db,
148
                               __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType,
149
                               STscObj** pTscObj);
150

151
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* totp,
2,805,560✔
152
                              const char* db, uint16_t port, int connType, STscObj** pObj) {
153
  TSC_ERR_RET(taos_init());
2,805,560✔
154
  if (!validateUserName(user)) {
2,805,560✔
155
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
156
  }
157
  int32_t code = 0;
2,805,504✔
158

159
  char localDb[TSDB_DB_NAME_LEN] = {0};
2,805,504✔
160
  if (db != NULL && strlen(db) > 0) {
2,805,504✔
161
    if (!validateDbName(db)) {
1,206,467✔
162
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
163
    }
164

165
    tstrncpy(localDb, db, sizeof(localDb));
1,206,523✔
166
    (void)strdequote(localDb);
1,206,523✔
167
  }
168

169
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
2,805,448✔
170
  if (auth == NULL) {
2,805,448✔
171
    if (!validatePassword(pass)) {
2,805,114✔
172
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
173
    }
174

175
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
2,804,739✔
176
  } else {
177
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
334✔
178
  }
179

180
  int32_t totpCode = -1;
2,805,434✔
181
  if (totp != NULL) {
2,805,434✔
182
    char* endptr = NULL;
35✔
183
    totpCode = taosStr2Int32(totp, &endptr, 10);
35✔
184
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
35✔
185
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
186
    }
187
  }
188

189
  SCorEpSet epSet = {0};
2,805,434✔
190
  if (ip) {
2,805,027✔
191
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
958,521✔
192
  } else {
193
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
1,846,506✔
194
  }
195

196
  if (port) {
2,803,942✔
197
    epSet.epSet.eps[0].port = port;
112,939✔
198
    epSet.epSet.eps[1].port = port;
112,939✔
199
  }
200

201
  char* key = getClusterKey(user, secretEncrypt, ip, port);
2,803,942✔
202
  if (NULL == key) {
2,804,176✔
203
    TSC_ERR_RET(terrno);
×
204
  }
205
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
2,804,176✔
206
          user, db, key);
207
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
7,457,029✔
208
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
4,652,034✔
209
  }
210

211
  SAppInstInfo** pInst = NULL;
2,804,995✔
212
  code = taosThreadMutexLock(&appInfo.mutex);
2,804,995✔
213
  if (TSDB_CODE_SUCCESS != code) {
2,804,995✔
214
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
215
    TSC_ERR_RET(code);
×
216
  }
217

218
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
2,804,995✔
219
  SAppInstInfo* p = NULL;
2,804,995✔
220
  if (pInst == NULL) {
2,804,995✔
221
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
1,310,512✔
222
    if (NULL == p) {
1,310,512✔
223
      TSC_ERR_JRET(terrno);
×
224
    }
225
    p->mgmtEp = epSet;
1,310,512✔
226
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
1,310,512✔
227
    if (TSDB_CODE_SUCCESS != code) {
1,310,512✔
228
      taosMemoryFree(p);
×
229
      TSC_ERR_JRET(code);
×
230
    }
231
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
1,310,512✔
232
    if (TSDB_CODE_SUCCESS != code) {
1,310,512✔
233
      taosMemoryFree(p);
48✔
234
      TSC_ERR_JRET(code);
48✔
235
    }
236
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
1,310,464✔
237
    if (TSDB_CODE_SUCCESS != code) {
1,310,464✔
238
      destroyAppInst(&p);
×
239
      TSC_ERR_JRET(code);
×
240
    }
241
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
1,310,464✔
242
    if (TSDB_CODE_SUCCESS != code) {
1,310,464✔
243
      destroyAppInst(&p);
×
244
      TSC_ERR_JRET(code);
×
245
    }
246
    p->instKey = key;
1,310,464✔
247
    key = NULL;
1,310,464✔
248
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
1,310,464✔
249

250
    pInst = &p;
1,310,464✔
251
  } else {
252
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
1,494,483✔
253
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
254
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
255
    }
256
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
257
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
1,494,483✔
258
  }
259

260
_return:
2,804,995✔
261

262
  if (TSDB_CODE_SUCCESS != code) {
2,804,995✔
263
    (void)taosThreadMutexUnlock(&appInfo.mutex);
48✔
264
    taosMemoryFreeClear(key);
48✔
265
    return code;
48✔
266
  } else {
267
    code = taosThreadMutexUnlock(&appInfo.mutex);
2,804,947✔
268
    taosMemoryFreeClear(key);
2,804,947✔
269
    if (TSDB_CODE_SUCCESS != code) {
2,804,947✔
270
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
271
      return code;
×
272
    }
273
    SSessParam pPara = {.type = SESSION_PER_USER, .value = 1};
2,804,947✔
274
    code = sessMgtUpdateUserMetric((char*)user, &pPara);
2,804,947✔
275
    if (TSDB_CODE_SUCCESS != code) {
2,804,947✔
276
      tscError("failed to connect with user:%s, code:%s", user, tstrerror(code));
×
277
      return code;
×
278
    }
279
    return taosConnectImpl(user, &secretEncrypt[0], totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
2,804,947✔
280
  }
281
}
282

283
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
284
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
285
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
286
//     return *ppAppInstInfo;
287
//   } else {
288
//     return NULL;
289
//   }
290
// }
291

292
void freeQueryParam(SSyncQueryParam* param) {
571,256✔
293
  if (param == NULL) return;
571,256✔
294
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
571,256✔
295
    tscError("failed to destroy semaphore in freeQueryParam");
×
296
  }
297
  taosMemoryFree(param);
571,256✔
298
}
299

300
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
745,059,255✔
301
                     SRequestObj** pRequest, int64_t reqid) {
302
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
745,059,255✔
303
  if (TSDB_CODE_SUCCESS != code) {
745,064,932✔
304
    tscError("failed to malloc sqlObj, %s", sql);
×
305
    return code;
×
306
  }
307

308
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
745,064,932✔
309
  if ((*pRequest)->sqlstr == NULL) {
745,065,069✔
310
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
311
    destroyRequest(*pRequest);
×
312
    *pRequest = NULL;
×
313
    return terrno;
×
314
  }
315

316
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
745,063,647✔
317
  (*pRequest)->sqlstr[sqlLen] = 0;
745,072,459✔
318
  (*pRequest)->sqlLen = sqlLen;
745,071,806✔
319
  (*pRequest)->validateOnly = validateSql;
745,072,915✔
320
  (*pRequest)->stmtBindVersion = 0;
745,072,867✔
321

322
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
745,071,438✔
323

324
  STscObj* pTscObj = (*pRequest)->pTscObj;
745,072,069✔
325
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
745,071,580✔
326
                             sizeof((*pRequest)->self));
327
  if (err) {
745,065,669✔
328
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
329
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
330
    destroyRequest(*pRequest);
×
331
    *pRequest = NULL;
×
332
    return terrno;
×
333
  }
334

335
  (*pRequest)->allocatorRefId = -1;
745,065,669✔
336
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
745,070,250✔
337
    if (TSDB_CODE_SUCCESS !=
275,568,422✔
338
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
275,563,276✔
339
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
340
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
341
      destroyRequest(*pRequest);
×
342
      *pRequest = NULL;
×
343
      return terrno;
×
344
    }
345
  }
346

347
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
745,076,152✔
348
  return TSDB_CODE_SUCCESS;
745,068,178✔
349
}
350

351
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
352
  int32_t code =
353
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
354
  if (TSDB_CODE_SUCCESS == code) {
×
355
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
356
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
357
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
358
    (*pNewRequest)->isSubReq = true;
×
359
  }
360
  return code;
×
361
}
362

363
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
690,404✔
364
  STscObj* pTscObj = pRequest->pTscObj;
690,404✔
365

366
  SParseContext cxt = {
690,514✔
367
      .requestId = pRequest->requestId,
690,156✔
368
      .requestRid = pRequest->self,
690,694✔
369
      .acctId = pTscObj->acctId,
690,796✔
370
      .db = pRequest->pDb,
690,385✔
371
      .topicQuery = topicQuery,
372
      .pSql = pRequest->sqlstr,
690,419✔
373
      .sqlLen = pRequest->sqlLen,
690,144✔
374
      .pMsg = pRequest->msgBuf,
690,438✔
375
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
376
      .pTransporter = pTscObj->pAppInfo->pTransporter,
689,557✔
377
      .pStmtCb = pStmtCb,
378
      .pUser = pTscObj->user,
690,370✔
379
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
689,282✔
380
      .enableSysInfo = pTscObj->sysInfo,
690,283✔
381
      .svrVer = pTscObj->sVer,
690,205✔
382
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
690,088✔
383
      .stmtBindVersion = pRequest->stmtBindVersion,
690,096✔
384
      .setQueryFp = setQueryRequest,
385
      .timezone = pTscObj->optionInfo.timezone,
690,061✔
386
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
690,344✔
387
  };
388

389
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
690,137✔
390
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
690,514✔
391
  if (code != TSDB_CODE_SUCCESS) {
690,660✔
392
    return code;
×
393
  }
394

395
  code = qParseSql(&cxt, pQuery);
690,660✔
396
  if (TSDB_CODE_SUCCESS == code) {
690,111✔
397
    if ((*pQuery)->haveResultSet) {
688,291✔
398
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
399
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
400
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
401
    }
402
  }
403

404
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
690,167✔
405
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
688,167✔
406
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
688,405✔
407
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
688,337✔
408
  }
409

410
  taosArrayDestroy(cxt.pTableMetaPos);
690,048✔
411
  taosArrayDestroy(cxt.pTableVgroupPos);
689,637✔
412

413
  return code;
689,773✔
414
}
415

416
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
417
  SRetrieveTableRsp* pRsp = NULL;
×
418
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
419
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
420
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
421
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
422
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
423
                                 pRequest->stmtBindVersion > 0);
×
424
  }
425

426
  return code;
×
427
}
428

429
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
349,205✔
430
  // drop table if exists not_exists_table
431
  if (NULL == pQuery->pCmdMsg) {
349,205✔
432
    return TSDB_CODE_SUCCESS;
×
433
  }
434

435
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
349,205✔
436
  pRequest->type = pMsgInfo->msgType;
349,205✔
437
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
349,205✔
438
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
349,205✔
439

440
  STscObj*      pTscObj = pRequest->pTscObj;
349,205✔
441
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
349,205✔
442

443
  // int64_t transporterId = 0;
444
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
349,205✔
445
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
349,205✔
446
  return TSDB_CODE_SUCCESS;
349,205✔
447
}
448

449
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
1,299,872,857✔
450

451
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
5,274,243✔
452
  SRetrieveTableRsp* pRsp = NULL;
5,274,243✔
453
  if (pRequest->validateOnly) {
5,274,243✔
454
    doRequestCallback(pRequest, 0);
12,420✔
455
    return;
12,420✔
456
  }
457

458
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
10,509,377✔
459
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
10,509,433✔
460
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
5,261,823✔
461
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
2,826,171✔
462
                                 pRequest->stmtBindVersion > 0);
2,826,171✔
463
  }
464

465
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
5,261,823✔
466
  pRequest->code = code;
5,261,823✔
467

468
  if (pRequest->code != TSDB_CODE_SUCCESS) {
5,261,823✔
469
    pResultInfo->numOfRows = 0;
3,192✔
470
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
3,192✔
471
             pRequest->requestId);
472
  } else {
473
    tscDebug(
5,258,631✔
474
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
475
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
476
  }
477

478
  doRequestCallback(pRequest, code);
5,261,823✔
479
}
480

481
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
17,071,311✔
482
  if (pRequest->validateOnly) {
17,071,311✔
483
    doRequestCallback(pRequest, 0);
×
484
    return TSDB_CODE_SUCCESS;
×
485
  }
486

487
  // drop table if exists not_exists_table
488
  if (NULL == pQuery->pCmdMsg) {
17,071,567✔
489
    doRequestCallback(pRequest, 0);
7,993✔
490
    return TSDB_CODE_SUCCESS;
7,993✔
491
  }
492

493
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
17,063,392✔
494
  pRequest->type = pMsgInfo->msgType;
17,063,574✔
495
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
17,063,574✔
496
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
17,063,392✔
497

498
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
17,063,574✔
499
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
17,063,392✔
500

501
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
17,063,574✔
502
  if (code) {
17,063,574✔
503
    doRequestCallback(pRequest, code);
×
504
  }
505
  return code;
17,063,574✔
506
}
507

508
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
427,357✔
509
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
427,357✔
510
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
427,357✔
511

512
  if (node1->load < node2->load) {
427,357✔
513
    return -1;
×
514
  }
515

516
  return node1->load > node2->load;
427,357✔
517
}
518

519
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
66,988✔
520
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
66,988✔
521
  if (pInfo->pQnodeList) {
66,988✔
522
    taosArrayDestroy(pInfo->pQnodeList);
64,418✔
523
    pInfo->pQnodeList = NULL;
64,418✔
524
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
64,418✔
525
  }
526

527
  if (pNodeList) {
66,988✔
528
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
66,988✔
529
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
66,988✔
530
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
66,988✔
531
             taosArrayGetSize(pInfo->pQnodeList));
532
  }
533
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
66,988✔
534

535
  return TSDB_CODE_SUCCESS;
66,988✔
536
}
537

538
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
745,145,307✔
539
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
745,145,307✔
540
    *required = false;
744,992,428✔
541
    return TSDB_CODE_SUCCESS;
744,989,198✔
542
  }
543

544
  int32_t       code = TSDB_CODE_SUCCESS;
152,879✔
545
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
152,879✔
546
  *required = false;
152,879✔
547

548
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
152,879✔
549
  *required = (NULL == pInfo->pQnodeList);
152,879✔
550
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
152,879✔
551
  return TSDB_CODE_SUCCESS;
152,879✔
552
}
553

554
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
555
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
556
  int32_t       code = 0;
×
557

558
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
559
  if (pInfo->pQnodeList) {
×
560
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
561
  }
562
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
563
  if (NULL == *pNodeList) {
×
564
    SCatalog* pCatalog = NULL;
×
565
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
566
    if (TSDB_CODE_SUCCESS == code) {
×
567
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
568
      if (NULL == pNodeList) {
×
569
        TSC_ERR_RET(terrno);
×
570
      }
571
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
572
                               .requestId = pRequest->requestId,
×
573
                               .requestObjRefId = pRequest->self,
×
574
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
575
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
576
    }
577

578
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
579
      code = updateQnodeList(pInfo, *pNodeList);
×
580
    }
581
  }
582

583
  return code;
×
584
}
585

586
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
6,535,795✔
587
  pRequest->type = pQuery->msgType;
6,535,795✔
588
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
6,535,761✔
589

590
  SPlanContext cxt = {.queryId = pRequest->requestId,
7,407,597✔
591
                      .acctId = pRequest->pTscObj->acctId,
6,535,761✔
592
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
6,535,369✔
593
                      .pAstRoot = pQuery->pRoot,
6,536,049✔
594
                      .showRewrite = pQuery->showRewrite,
6,536,083✔
595
                      .pMsg = pRequest->msgBuf,
6,535,947✔
596
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
597
                      .pUser = pRequest->pTscObj->user,
6,535,947✔
598
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
6,535,675✔
599
                      .sysInfo = pRequest->pTscObj->sysInfo};
6,535,743✔
600

601
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
6,535,641✔
602
}
603

604
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
147,523,869✔
605
                         const SExtSchema* pExtSchema, bool isStmt) {
606
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
147,523,869✔
UNCOV
607
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
608
    return TSDB_CODE_INVALID_PARA;
×
609
  }
610

611
  pResInfo->numOfCols = numOfCols;
147,524,515✔
612
  if (pResInfo->fields != NULL) {
147,524,756✔
613
    taosMemoryFree(pResInfo->fields);
21,594✔
614
  }
615
  if (pResInfo->userFields != NULL) {
147,523,346✔
616
    taosMemoryFree(pResInfo->userFields);
21,594✔
617
  }
618
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
147,523,487✔
619
  if (NULL == pResInfo->fields) return terrno;
147,523,306✔
620
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
147,523,227✔
621
  if (NULL == pResInfo->userFields) {
147,522,265✔
622
    taosMemoryFree(pResInfo->fields);
×
623
    return terrno;
×
624
  }
625
  if (numOfCols != pResInfo->numOfCols) {
147,522,457✔
626
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
627
    return TSDB_CODE_FAILED;
×
628
  }
629

630
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
814,785,479✔
631
    pResInfo->fields[i].type = pSchema[i].type;
667,264,422✔
632

633
    pResInfo->userFields[i].type = pSchema[i].type;
667,261,623✔
634
    // userFields must convert to type bytes, no matter isStmt or not
635
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
667,264,587✔
636
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
667,265,162✔
637
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
667,265,425✔
638
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
1,456,187✔
639
    }
640

641
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
667,264,415✔
642
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
667,265,225✔
643
  }
644
  return TSDB_CODE_SUCCESS;
147,525,205✔
645
}
646

647
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
117,066,652✔
648
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
117,066,652✔
649
      precision != TSDB_TIME_PRECISION_NANO) {
650
    return;
×
651
  }
652

653
  pResInfo->precision = precision;
117,066,652✔
654
}
655

656
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
121,492,873✔
657
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
121,492,873✔
658
  if (NULL == nodeList) {
121,496,733✔
UNCOV
659
    return terrno;
×
660
  }
661
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
121,497,255✔
662

663
  int32_t dbNum = taosArrayGetSize(pDbVgList);
121,497,255✔
664
  for (int32_t i = 0; i < dbNum; ++i) {
240,715,491✔
665
    SArray* pVg = taosArrayGetP(pDbVgList, i);
119,217,192✔
666
    if (NULL == pVg) {
119,217,772✔
667
      continue;
×
668
    }
669
    int32_t vgNum = taosArrayGetSize(pVg);
119,217,772✔
670
    if (vgNum <= 0) {
119,217,806✔
671
      continue;
719,061✔
672
    }
673

674
    for (int32_t j = 0; j < vgNum; ++j) {
377,584,510✔
675
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
259,085,688✔
676
      if (NULL == pInfo) {
259,087,267✔
677
        taosArrayDestroy(nodeList);
×
678
        return TSDB_CODE_OUT_OF_RANGE;
×
679
      }
680
      SQueryNodeLoad load = {0};
259,087,267✔
681
      load.addr.nodeId = pInfo->vgId;
259,086,403✔
682
      load.addr.epSet = pInfo->epSet;
259,087,707✔
683

684
      if (NULL == taosArrayPush(nodeList, &load)) {
259,083,510✔
685
        taosArrayDestroy(nodeList);
×
686
        return terrno;
×
687
      }
688
    }
689
  }
690

691
  int32_t vnodeNum = taosArrayGetSize(nodeList);
121,498,299✔
692
  if (vnodeNum > 0) {
121,498,077✔
693
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
118,206,823✔
694
    goto _return;
118,206,180✔
695
  }
696

697
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
3,291,254✔
698
  if (mnodeNum <= 0) {
3,290,175✔
699
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
700
    goto _return;
×
701
  }
702

703
  void* pData = taosArrayGet(pMnodeList, 0);
3,290,175✔
704
  if (NULL == pData) {
3,290,175✔
705
    taosArrayDestroy(nodeList);
×
706
    return TSDB_CODE_OUT_OF_RANGE;
×
707
  }
708
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
3,290,175✔
709
    taosArrayDestroy(nodeList);
×
710
    return terrno;
×
711
  }
712

713
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
3,290,175✔
714

715
_return:
57,600✔
716

717
  *pNodeList = nodeList;
121,496,163✔
718

719
  return TSDB_CODE_SUCCESS;
121,496,184✔
720
}
721

722
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
84,179✔
723
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
84,179✔
724
  if (NULL == nodeList) {
84,179✔
725
    return terrno;
×
726
  }
727

728
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
84,179✔
729
  if (qNodeNum > 0) {
84,179✔
730
    void* pData = taosArrayGet(pQnodeList, 0);
23✔
731
    if (NULL == pData) {
23✔
732
      taosArrayDestroy(nodeList);
×
733
      return TSDB_CODE_OUT_OF_RANGE;
×
734
    }
735
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
23✔
736
      taosArrayDestroy(nodeList);
×
737
      return terrno;
×
738
    }
739
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
23✔
740
    goto _return;
23✔
741
  }
742

743
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
84,156✔
744
  if (mnodeNum <= 0) {
84,156✔
745
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
4✔
746
    goto _return;
4✔
747
  }
748

749
  void* pData = taosArrayGet(pMnodeList, 0);
84,152✔
750
  if (NULL == pData) {
84,152✔
751
    taosArrayDestroy(nodeList);
×
752
    return TSDB_CODE_OUT_OF_RANGE;
×
753
  }
754
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
84,152✔
755
    taosArrayDestroy(nodeList);
×
756
    return terrno;
×
757
  }
758

759
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
84,152✔
760

761
_return:
×
762

763
  *pNodeList = nodeList;
84,179✔
764

765
  return TSDB_CODE_SUCCESS;
84,179✔
766
}
767

768
void freeVgList(void* list) {
6,488,974✔
769
  SArray* pList = *(SArray**)list;
6,488,974✔
770
  taosArrayDestroy(pList);
6,489,847✔
771
}
6,489,544✔
772

773
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
115,044,671✔
774
  SArray* pDbVgList = NULL;
115,044,671✔
775
  SArray* pQnodeList = NULL;
115,044,671✔
776
  FDelete fp = NULL;
115,044,671✔
777
  int32_t code = 0;
115,044,671✔
778

779
  switch (tsQueryPolicy) {
115,044,671✔
780
    case QUERY_POLICY_VNODE:
114,961,605✔
781
    case QUERY_POLICY_CLIENT: {
782
      if (pResultMeta) {
114,961,605✔
783
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
114,961,814✔
784
        if (NULL == pDbVgList) {
114,961,374✔
785
          code = terrno;
×
786
          goto _return;
×
787
        }
788
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
114,961,374✔
789
        for (int32_t i = 0; i < dbNum; ++i) {
227,687,903✔
790
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
112,727,005✔
791
          if (pRes->code || NULL == pRes->pRes) {
112,727,549✔
792
            continue;
1,082✔
793
          }
794

795
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
225,452,118✔
796
            code = terrno;
×
797
            goto _return;
×
798
          }
799
        }
800
      } else {
UNCOV
801
        fp = freeVgList;
×
802

UNCOV
803
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
804
        if (dbNum > 0) {
×
805
          SCatalog*     pCtg = NULL;
×
806
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
807
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
808
          if (code != TSDB_CODE_SUCCESS) {
×
809
            goto _return;
×
810
          }
811

812
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
813
          if (NULL == pDbVgList) {
×
814
            code = terrno;
×
815
            goto _return;
×
816
          }
817
          SArray* pVgList = NULL;
×
818
          for (int32_t i = 0; i < dbNum; ++i) {
×
819
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
820
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
821
                                     .requestId = pRequest->requestId,
×
822
                                     .requestObjRefId = pRequest->self,
×
823
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
824

825
            // catalogGetDBVgList will handle dbFName == null.
826
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
827
            if (code) {
×
828
              goto _return;
×
829
            }
830

831
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
832
              code = terrno;
×
833
              goto _return;
×
834
            }
835
          }
836
        }
837
      }
838

839
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
114,960,898✔
840
      break;
114,961,865✔
841
    }
842
    case QUERY_POLICY_HYBRID:
84,179✔
843
    case QUERY_POLICY_QNODE: {
844
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
168,343✔
845
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
84,164✔
846
        if (pRes->code) {
84,164✔
847
          pQnodeList = NULL;
×
848
        } else {
849
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
84,164✔
850
          if (NULL == pQnodeList) {
84,164✔
851
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
852
            goto _return;
×
853
          }
854
        }
855
      } else {
856
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
15✔
857
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
15✔
858
        if (pInst->pQnodeList) {
15✔
859
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
15✔
860
          if (NULL == pQnodeList) {
15✔
861
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
862
            goto _return;
×
863
          }
864
        }
865
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
15✔
866
      }
867

868
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
84,179✔
869
      break;
84,179✔
870
    }
871
    default:
×
872
      tscError("unknown query policy: %d", tsQueryPolicy);
×
873
      return TSDB_CODE_APP_ERROR;
×
874
  }
875

876
_return:
115,046,044✔
877
  taosArrayDestroyEx(pDbVgList, fp);
115,046,044✔
878
  taosArrayDestroy(pQnodeList);
115,046,270✔
879

880
  return code;
115,046,270✔
881
}
882

883
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
6,531,874✔
884
  SArray* pDbVgList = NULL;
6,531,874✔
885
  SArray* pQnodeList = NULL;
6,531,874✔
886
  int32_t code = 0;
6,533,852✔
887

888
  switch (tsQueryPolicy) {
6,533,852✔
889
    case QUERY_POLICY_VNODE:
6,532,658✔
890
    case QUERY_POLICY_CLIENT: {
891
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
6,532,658✔
892
      if (dbNum > 0) {
6,535,463✔
893
        SCatalog*     pCtg = NULL;
6,490,563✔
894
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
6,489,261✔
895
        code = catalogGetHandle(pInst->clusterId, &pCtg);
6,490,535✔
896
        if (code != TSDB_CODE_SUCCESS) {
6,485,990✔
897
          goto _return;
×
898
        }
899

900
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
6,485,990✔
901
        if (NULL == pDbVgList) {
6,488,574✔
902
          code = terrno;
152✔
903
          goto _return;
×
904
        }
905
        SArray* pVgList = NULL;
6,488,422✔
906
        for (int32_t i = 0; i < dbNum; ++i) {
12,977,774✔
907
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
6,487,901✔
908
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
6,489,302✔
909
                                   .requestId = pRequest->requestId,
6,488,169✔
910
                                   .requestObjRefId = pRequest->self,
6,488,367✔
911
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
6,487,747✔
912

913
          // catalogGetDBVgList will handle dbFName == null.
914
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
6,490,715✔
915
          if (code) {
6,488,701✔
916
            goto _return;
×
917
          }
918

919
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
6,489,301✔
920
            code = terrno;
×
921
            goto _return;
×
922
          }
923
        }
924
      }
925

926
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
6,534,966✔
927
      break;
6,534,232✔
928
    }
929
    case QUERY_POLICY_HYBRID:
×
930
    case QUERY_POLICY_QNODE: {
931
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
932

933
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
934
      break;
×
935
    }
936
    default:
1,195✔
937
      tscError("unknown query policy: %d", tsQueryPolicy);
1,195✔
938
      return TSDB_CODE_APP_ERROR;
×
939
  }
940

941
_return:
6,534,835✔
942

943
  taosArrayDestroyEx(pDbVgList, freeVgList);
6,534,305✔
944
  taosArrayDestroy(pQnodeList);
6,533,763✔
945

946
  return code;
6,534,449✔
947
}
948

949
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
6,533,523✔
950
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
6,533,523✔
951

952
  SExecResult      res = {0};
6,534,864✔
953
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
6,534,985✔
954
                           .requestId = pRequest->requestId,
6,535,112✔
955
                           .requestObjRefId = pRequest->self};
6,534,428✔
956
  SSchedulerReq    req = {
7,406,192✔
957
         .syncReq = true,
958
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
6,534,321✔
959
         .pConn = &conn,
960
         .pNodeList = pNodeList,
961
         .pDag = pDag,
962
         .sql = pRequest->sqlstr,
6,534,321✔
963
         .startTs = pRequest->metric.start,
6,534,640✔
964
         .execFp = NULL,
965
         .cbParam = NULL,
966
         .chkKillFp = chkRequestKilled,
967
         .chkKillParam = (void*)pRequest->self,
6,532,882✔
968
         .pExecRes = &res,
969
         .source = pRequest->source,
6,535,417✔
970
         .pWorkerCb = getTaskPoolWorkerCb(),
6,533,707✔
971
  };
972

973
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
6,533,434✔
974

975
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
6,535,099✔
976
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
6,535,141✔
977

978
  if (code != TSDB_CODE_SUCCESS) {
6,534,526✔
979
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
980

981
    pRequest->code = code;
×
982
    terrno = code;
×
983
    return pRequest->code;
×
984
  }
985

986
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
6,534,526✔
987
      TDMT_VND_CREATE_TABLE == pRequest->type) {
17,350✔
988
    pRequest->body.resInfo.numOfRows = res.numOfRows;
6,522,880✔
989
    if (TDMT_VND_SUBMIT == pRequest->type) {
6,523,237✔
990
      STscObj*            pTscObj = pRequest->pTscObj;
6,517,739✔
991
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
6,517,360✔
992
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
6,518,539✔
993
    }
994

995
    schedulerFreeJob(&pRequest->body.queryJob, 0);
6,523,518✔
996
  }
997

998
  pRequest->code = res.code;
6,535,569✔
999
  terrno = res.code;
6,535,826✔
1000
  return pRequest->code;
6,534,697✔
1001
}
1002

1003
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
473,856,244✔
1004
  SArray*      pArray = NULL;
473,856,244✔
1005
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
473,856,244✔
1006
  if (NULL == pRsp->aCreateTbRsp) {
473,856,244✔
1007
    return TSDB_CODE_SUCCESS;
464,709,053✔
1008
  }
1009

1010
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
9,156,585✔
1011
  for (int32_t i = 0; i < tbNum; ++i) {
21,911,706✔
1012
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
12,755,054✔
1013
    if (pTbRsp->pMeta) {
12,754,952✔
1014
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
12,153,335✔
1015
    }
1016
  }
1017

1018
  return TSDB_CODE_SUCCESS;
9,156,652✔
1019
}
1020

1021
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
101,684,001✔
1022
  int32_t code = 0;
101,684,001✔
1023
  SArray* pArray = NULL;
101,684,001✔
1024
  SArray* pTbArray = (SArray*)res;
101,684,001✔
1025
  int32_t tbNum = taosArrayGetSize(pTbArray);
101,684,001✔
1026
  if (tbNum <= 0) {
101,684,294✔
1027
    return TSDB_CODE_SUCCESS;
×
1028
  }
1029

1030
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
101,684,294✔
1031
  if (NULL == pArray) {
101,683,550✔
1032
    return terrno;
×
1033
  }
1034

1035
  for (int32_t i = 0; i < tbNum; ++i) {
336,970,834✔
1036
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
235,287,050✔
1037
    if (NULL == tbInfo) {
235,287,539✔
1038
      code = terrno;
×
1039
      goto _return;
×
1040
    }
1041
    STbSVersion tbSver = {
235,287,539✔
1042
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
235,287,050✔
1043
    if (NULL == taosArrayPush(pArray, &tbSver)) {
235,287,539✔
1044
      code = terrno;
×
1045
      goto _return;
×
1046
    }
1047
  }
1048

1049
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
101,683,784✔
1050
                           .requestId = pRequest->requestId,
101,684,014✔
1051
                           .requestObjRefId = pRequest->self,
101,684,014✔
1052
                           .mgmtEps = *epset};
1053

1054
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
101,683,759✔
1055

1056
_return:
101,684,481✔
1057

1058
  taosArrayDestroy(pArray);
101,683,746✔
1059
  return code;
101,684,226✔
1060
}
1061

1062
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
9,121,508✔
1063
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
9,121,508✔
1064
}
1065

1066
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
67,683,848✔
1067
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
67,683,848✔
1068
}
1069

1070
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
654,203,199✔
1071
  if (NULL == pRequest->body.resInfo.execRes.res) {
654,203,199✔
1072
    return pRequest->code;
25,274,500✔
1073
  }
1074

1075
  SCatalog*     pCatalog = NULL;
628,916,737✔
1076
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
628,921,767✔
1077

1078
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
628,938,009✔
1079
  if (code) {
628,929,831✔
1080
    return code;
×
1081
  }
1082

1083
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
628,929,831✔
1084
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
628,944,441✔
1085

1086
  switch (pRes->msgType) {
628,944,616✔
1087
    case TDMT_VND_ALTER_TABLE:
3,985,166✔
1088
    case TDMT_MND_ALTER_STB: {
1089
      code = handleAlterTbExecRes(pRes->res, pCatalog);
3,985,166✔
1090
      break;
3,985,166✔
1091
    }
1092
    case TDMT_VND_CREATE_TABLE: {
49,059,172✔
1093
      SArray* pList = (SArray*)pRes->res;
49,059,172✔
1094
      int32_t num = taosArrayGetSize(pList);
49,065,495✔
1095
      for (int32_t i = 0; i < num; ++i) {
102,639,983✔
1096
        void* res = taosArrayGetP(pList, i);
53,573,237✔
1097
        // handleCreateTbExecRes will handle res == null
1098
        code = handleCreateTbExecRes(res, pCatalog);
53,573,195✔
1099
      }
1100
      break;
49,066,746✔
1101
    }
1102
    case TDMT_MND_CREATE_STB: {
342,286✔
1103
      code = handleCreateTbExecRes(pRes->res, pCatalog);
342,286✔
1104
      break;
342,286✔
1105
    }
1106
    case TDMT_VND_SUBMIT: {
473,857,969✔
1107
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
473,857,969✔
1108

1109
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
473,867,419✔
1110
      break;
473,862,641✔
1111
    }
1112
    case TDMT_SCH_QUERY:
101,684,001✔
1113
    case TDMT_SCH_MERGE_QUERY: {
1114
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
101,684,001✔
1115
      break;
101,684,504✔
1116
    }
1117
    default:
319✔
1118
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
319✔
1119
               pRequest->type, pRequest->requestId);
1120
      code = TSDB_CODE_APP_ERROR;
×
1121
  }
1122

1123
  return code;
628,941,343✔
1124
}
1125

1126
static bool incompletaFileParsing(SNode* pStmt) {
633,307,050✔
1127
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
633,307,050✔
1128
}
1129

1130
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
1131
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1132

1133
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
1134
  if (TSDB_CODE_SUCCESS == code) {
×
1135
    int64_t analyseStart = taosGetTimestampUs();
×
1136
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1137
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1138
  }
1139

1140
  if (TSDB_CODE_SUCCESS == code) {
×
1141
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1142
  }
1143

1144
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1145
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1146
}
×
1147

1148
void returnToUser(SRequestObj* pRequest) {
60,077,649✔
1149
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
60,077,649✔
1150
    // return to client
1151
    doRequestCallback(pRequest, pRequest->code);
60,077,649✔
1152
    return;
60,077,649✔
1153
  }
1154

1155
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1156
  if (pUserReq) {
×
1157
    pUserReq->code = pRequest->code;
×
1158
    // return to client
1159
    doRequestCallback(pUserReq, pUserReq->code);
×
1160
    (void)releaseRequest(pRequest->relation.userRefId);
×
1161
    return;
×
1162
  } else {
1163
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1164
             pRequest->relation.userRefId, pRequest->requestId);
1165
  }
1166
}
1167

1168
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1169
  int64_t     lastTs = 0;
×
1170
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
1171
  int32_t     numOfFields = taos_num_fields(pRes);
×
1172

1173
  int32_t code = createDataBlock(pBlock);
×
1174
  if (code) {
×
1175
    return code;
×
1176
  }
1177

1178
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1179
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1180
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1181
    if (TSDB_CODE_SUCCESS != code) {
×
1182
      blockDataDestroy(*pBlock);
×
1183
      return code;
×
1184
    }
1185
  }
1186

1187
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1188
  if (TSDB_CODE_SUCCESS != code) {
×
1189
    blockDataDestroy(*pBlock);
×
1190
    return code;
×
1191
  }
1192

1193
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1194
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1195
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1196
      tscError("invalid data from vnode");
×
1197
      blockDataDestroy(*pBlock);
×
1198
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1199
    }
1200
    int64_t ts = *(int64_t*)pRow[0];
×
1201
    if (lastTs < ts) {
×
1202
      lastTs = ts;
×
1203
    }
1204

1205
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1206
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1207
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1208
      if (TSDB_CODE_SUCCESS != code) {
×
1209
        blockDataDestroy(*pBlock);
×
1210
        return code;
×
1211
      }
1212
    }
1213

1214
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1215
            *(int64_t*)pRow[2]);
1216
  }
1217

1218
  (*pBlock)->info.window.ekey = lastTs;
×
1219
  (*pBlock)->info.rows = numOfRows;
×
1220

1221
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
1222
  return TSDB_CODE_SUCCESS;
×
1223
}
1224

1225
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1226
  SRequestObj* pRequest = (SRequestObj*)res;
×
1227
  if (pRequest->code) {
×
1228
    returnToUser(pRequest);
×
1229
    return;
×
1230
  }
1231

1232
  SSDataBlock* pBlock = NULL;
×
1233
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1234
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1235
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1236
             tstrerror(pRequest->code));
1237
    returnToUser(pRequest);
×
1238
    return;
×
1239
  }
1240

1241
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1242
  if (pNextReq) {
×
1243
    continuePostSubQuery(pNextReq, pBlock);
×
1244
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1245
  } else {
1246
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1247
             pRequest->relation.nextRefId, pRequest->requestId);
1248
  }
1249

1250
  blockDataDestroy(pBlock);
×
1251
}
1252

1253
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
1254
  SRequestObj* pRequest = pWrapper->pRequest;
×
1255
  if (TD_RES_QUERY(pRequest)) {
×
1256
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1257
    return;
×
1258
  }
1259

1260
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1261
  if (pNextReq) {
×
1262
    continuePostSubQuery(pNextReq, NULL);
×
1263
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1264
  } else {
1265
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1266
             pRequest->relation.nextRefId, pRequest->requestId);
1267
  }
1268
}
1269

1270
// todo refacto the error code  mgmt
1271
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
647,375,770✔
1272
  SSqlCallbackWrapper* pWrapper = param;
647,375,770✔
1273
  SRequestObj*         pRequest = pWrapper->pRequest;
647,375,770✔
1274
  STscObj*             pTscObj = pRequest->pTscObj;
647,377,112✔
1275

1276
  pRequest->code = code;
647,381,355✔
1277
  if (pResult) {
647,383,791✔
1278
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
647,343,466✔
1279
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
647,348,940✔
1280
  }
1281

1282
  int32_t type = pRequest->type;
647,377,399✔
1283
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
647,362,814✔
1284
    if (pResult) {
520,120,115✔
1285
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
520,107,520✔
1286

1287
      // record the insert rows
1288
      if (TDMT_VND_SUBMIT == type) {
520,109,287✔
1289
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
467,650,118✔
1290
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
467,657,328✔
1291
      }
1292
    }
1293
    schedulerFreeJob(&pRequest->body.queryJob, 0);
520,130,989✔
1294
  }
1295

1296
  taosMemoryFree(pResult);
647,383,123✔
1297
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
647,374,706✔
1298
           pRequest->requestId);
1299

1300
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
647,375,351✔
1301
      pRequest->stmtBindVersion == 0) {
49,254✔
1302
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
49,254✔
1303
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1304
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
49,254✔
1305
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1306
    }
1307
    restartAsyncQuery(pRequest, code);
49,254✔
1308
    return;
49,254✔
1309
  }
1310

1311
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
647,326,097✔
1312
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
647,326,097✔
1313
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
2,889,558✔
1314
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1315
    }
1316
  }
1317

1318
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
647,329,930✔
1319
  int32_t code1 = handleQueryExecRsp(pRequest);
647,326,640✔
1320
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
647,334,101✔
1321
    pRequest->code = code1;
×
1322
  }
1323

1324
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
1,280,637,837✔
1325
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
633,304,655✔
1326
    continueInsertFromCsv(pWrapper, pRequest);
13,102✔
1327
    return;
13,102✔
1328
  }
1329

1330
  if (pRequest->relation.nextRefId) {
647,322,984✔
1331
    handlePostSubQuery(pWrapper);
×
1332
  } else {
1333
    destorySqlCallbackWrapper(pWrapper);
647,322,630✔
1334
    pRequest->pWrapper = NULL;
647,317,876✔
1335

1336
    // return to client
1337
    doRequestCallback(pRequest, code);
647,322,192✔
1338
  }
1339
}
1340

1341
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
6,883,703✔
1342
  int32_t code = 0;
6,883,703✔
1343
  int32_t subplanNum = 0;
6,883,703✔
1344

1345
  if (pQuery->pRoot) {
6,883,703✔
1346
    pRequest->stmtType = pQuery->pRoot->type;
6,535,588✔
1347
  }
1348

1349
  if (pQuery->pRoot && !pRequest->inRetry) {
6,885,104✔
1350
    STscObj*            pTscObj = pRequest->pTscObj;
6,536,017✔
1351
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
6,535,915✔
1352
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
6,536,051✔
1353
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
6,524,906✔
1354
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
10,889✔
1355
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
10,769✔
1356
    }
1357
  }
1358

1359
  pRequest->body.execMode = pQuery->execMode;
6,884,880✔
1360
  switch (pQuery->execMode) {
6,884,948✔
1361
    case QUERY_EXEC_MODE_LOCAL:
×
1362
      if (!pRequest->validateOnly) {
×
1363
        if (NULL == pQuery->pRoot) {
×
1364
          terrno = TSDB_CODE_INVALID_PARA;
×
1365
          code = terrno;
×
1366
        } else {
1367
          code = execLocalCmd(pRequest, pQuery);
×
1368
        }
1369
      }
1370
      break;
×
1371
    case QUERY_EXEC_MODE_RPC:
349,205✔
1372
      if (!pRequest->validateOnly) {
349,205✔
1373
        code = execDdlQuery(pRequest, pQuery);
349,205✔
1374
      }
1375
      break;
349,205✔
1376
    case QUERY_EXEC_MODE_SCHEDULE: {
6,535,316✔
1377
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
6,535,316✔
1378
      if (NULL == pMnodeList) {
6,535,522✔
1379
        code = terrno;
×
1380
        break;
×
1381
      }
1382
      SQueryPlan* pDag = NULL;
6,535,522✔
1383
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
6,535,522✔
1384
      if (TSDB_CODE_SUCCESS == code) {
6,534,108✔
1385
        pRequest->body.subplanNum = pDag->numOfSubplans;
6,533,316✔
1386
        if (!pRequest->validateOnly) {
6,533,578✔
1387
          SArray* pNodeList = NULL;
6,534,136✔
1388
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
6,533,664✔
1389

1390
          if (TSDB_CODE_SUCCESS == code) {
6,534,164✔
1391
            SSessParam para = {.type = SESSION_MAX_CALL_VNODE_NUM, .value = taosArrayGetSize(pNodeList)};
6,534,230✔
1392
            code = tscUpdateSessMgtMetric(pRequest->pTscObj, &para);
6,534,977✔
1393
          }
1394

1395
          if (TSDB_CODE_SUCCESS == code) {
6,533,941✔
1396
            code = scheduleQuery(pRequest, pDag, pNodeList);
6,534,069✔
1397
          }
1398
          taosArrayDestroy(pNodeList);
6,533,101✔
1399
        }
1400
      }
1401
      taosArrayDestroy(pMnodeList);
6,534,627✔
1402
      break;
6,534,462✔
1403
    }
1404
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1405
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1406
      break;
×
1407
    default:
×
1408
      break;
×
1409
  }
1410

1411
  if (!keepQuery) {
6,884,074✔
1412
    qDestroyQuery(pQuery);
×
1413
  }
1414

1415
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
6,884,074✔
1416
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
7,067✔
1417
    if (TSDB_CODE_SUCCESS != ret) {
7,067✔
1418
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1419
               pRequest->requestId);
1420
    }
1421
  }
1422

1423
  if (TSDB_CODE_SUCCESS == code) {
6,882,750✔
1424
    code = handleQueryExecRsp(pRequest);
6,881,912✔
1425
  }
1426

1427
  if (TSDB_CODE_SUCCESS != code) {
6,883,439✔
1428
    pRequest->code = code;
4,517✔
1429
  }
1430

1431
  if (res) {
6,883,439✔
1432
    *res = pRequest->body.resInfo.execRes.res;
×
1433
    pRequest->body.resInfo.execRes.res = NULL;
×
1434
  }
1435
}
6,883,439✔
1436

1437
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
647,868,876✔
1438
                                 SSqlCallbackWrapper* pWrapper) {
1439
  int32_t code = TSDB_CODE_SUCCESS;
647,868,876✔
1440
  pRequest->type = pQuery->msgType;
647,868,876✔
1441
  SArray*     pMnodeList = NULL;
647,861,119✔
1442
  SQueryPlan* pDag = NULL;
647,861,119✔
1443
  int64_t     st = taosGetTimestampUs();
647,855,808✔
1444

1445
  if (!pRequest->parseOnly) {
647,855,808✔
1446
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
647,863,335✔
1447
    if (NULL == pMnodeList) {
647,858,579✔
1448
      code = terrno;
×
1449
    }
1450
    SPlanContext cxt = {.queryId = pRequest->requestId,
653,642,667✔
1451
                        .acctId = pRequest->pTscObj->acctId,
647,878,862✔
1452
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
647,878,676✔
1453
                        .pAstRoot = pQuery->pRoot,
647,893,418✔
1454
                        .showRewrite = pQuery->showRewrite,
647,894,611✔
1455
                        .isView = pWrapper->pParseCtx->isView,
647,883,187✔
1456
                        .isAudit = pWrapper->pParseCtx->isAudit,
647,885,179✔
1457
                        .pMsg = pRequest->msgBuf,
647,874,614✔
1458
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1459
                        .pUser = pRequest->pTscObj->user,
647,884,030✔
1460
                        .sysInfo = pRequest->pTscObj->sysInfo,
647,870,821✔
1461
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
647,860,848✔
1462
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
647,875,961✔
1463
    if (TSDB_CODE_SUCCESS == code) {
647,884,070✔
1464
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
647,877,208✔
1465
    }
1466
    if (code) {
647,864,304✔
1467
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
277,032✔
1468
               pRequest->requestId);
1469
    } else {
1470
      pRequest->body.subplanNum = pDag->numOfSubplans;
647,587,272✔
1471
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
647,594,434✔
1472
    }
1473
  }
1474

1475
  pRequest->metric.execStart = taosGetTimestampUs();
647,876,171✔
1476
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
647,862,882✔
1477

1478
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
650,744,453✔
1479
    SArray* pNodeList = NULL;
647,365,540✔
1480
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
647,360,052✔
1481
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
115,045,174✔
1482
    }
1483

1484
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
647,362,577✔
1485
                             .requestId = pRequest->requestId,
647,372,071✔
1486
                             .requestObjRefId = pRequest->self};
647,377,067✔
1487
    SSchedulerReq    req = {
650,249,236✔
1488
           .syncReq = false,
1489
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
647,364,447✔
1490
           .pConn = &conn,
1491
           .pNodeList = pNodeList,
1492
           .pDag = pDag,
1493
           .allocatorRefId = pRequest->allocatorRefId,
647,364,447✔
1494
           .sql = pRequest->sqlstr,
647,339,429✔
1495
           .startTs = pRequest->metric.start,
647,344,424✔
1496
           .execFp = schedulerExecCb,
1497
           .cbParam = pWrapper,
1498
           .chkKillFp = chkRequestKilled,
1499
           .chkKillParam = (void*)pRequest->self,
647,352,092✔
1500
           .pExecRes = NULL,
1501
           .source = pRequest->source,
647,339,788✔
1502
           .pWorkerCb = getTaskPoolWorkerCb(),
647,344,502✔
1503
    };
1504
    if (TSDB_CODE_SUCCESS == code) {
647,354,521✔
1505
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
647,383,974✔
1506
    }
1507

1508
    taosArrayDestroy(pNodeList);
647,347,704✔
1509
  } else {
1510
    qDestroyQueryPlan(pDag);
501,434✔
1511
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
506,225✔
1512
             pRequest->requestId);
1513
    destorySqlCallbackWrapper(pWrapper);
506,225✔
1514
    pRequest->pWrapper = NULL;
506,225✔
1515
    if (TSDB_CODE_SUCCESS != code) {
506,225✔
1516
      pRequest->code = terrno;
277,032✔
1517
    }
1518

1519
    doRequestCallback(pRequest, code);
506,225✔
1520
  }
1521

1522
  // todo not to be released here
1523
  taosArrayDestroy(pMnodeList);
647,887,738✔
1524

1525
  return code;
647,876,748✔
1526
}
1527

1528
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
670,890,049✔
1529
  int32_t code = 0;
670,890,049✔
1530

1531
  if (pRequest->parseOnly) {
670,890,049✔
1532
    doRequestCallback(pRequest, 0);
297,493✔
1533
    return;
297,493✔
1534
  }
1535

1536
  pRequest->body.execMode = pQuery->execMode;
670,605,171✔
1537
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
670,601,927✔
1538
    destorySqlCallbackWrapper(pWrapper);
22,725,920✔
1539
    pRequest->pWrapper = NULL;
22,725,920✔
1540
  }
1541

1542
  if (pQuery->pRoot && !pRequest->inRetry) {
670,522,104✔
1543
    STscObj*            pTscObj = pRequest->pTscObj;
670,592,928✔
1544
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
670,601,545✔
1545
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
670,594,130✔
1546
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
532,320,711✔
1547
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
467,448,665✔
1548
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
203,147,782✔
1549
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
108,650,199✔
1550
    }
1551
  }
1552

1553
  switch (pQuery->execMode) {
670,611,462✔
1554
    case QUERY_EXEC_MODE_LOCAL:
5,274,243✔
1555
      asyncExecLocalCmd(pRequest, pQuery);
5,274,243✔
1556
      break;
5,274,243✔
1557
    case QUERY_EXEC_MODE_RPC:
17,071,567✔
1558
      code = asyncExecDdlQuery(pRequest, pQuery);
17,071,567✔
1559
      break;
17,071,567✔
1560
    case QUERY_EXEC_MODE_SCHEDULE: {
647,850,949✔
1561
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
647,850,949✔
1562
      break;
647,882,731✔
1563
    }
1564
    case QUERY_EXEC_MODE_EMPTY_RESULT:
380,110✔
1565
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
380,110✔
1566
      doRequestCallback(pRequest, 0);
380,110✔
1567
      break;
380,110✔
1568
    default:
×
1569
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1570
      doRequestCallback(pRequest, -1);
×
1571
      break;
×
1572
  }
1573
}
1574

1575
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
11,696✔
1576
  SCatalog* pCatalog = NULL;
11,696✔
1577
  int32_t   code = 0;
11,696✔
1578
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
11,696✔
1579
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
11,696✔
1580

1581
  if (dbNum <= 0 && tblNum <= 0) {
11,696✔
1582
    return TSDB_CODE_APP_ERROR;
11,664✔
1583
  }
1584

1585
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
32✔
1586
  if (code != TSDB_CODE_SUCCESS) {
32✔
1587
    return code;
×
1588
  }
1589

1590
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
32✔
1591
                           .requestId = pRequest->requestId,
32✔
1592
                           .requestObjRefId = pRequest->self,
32✔
1593
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
32✔
1594

1595
  for (int32_t i = 0; i < dbNum; ++i) {
64✔
1596
    char* dbFName = taosArrayGet(pRequest->dbList, i);
32✔
1597

1598
    // catalogRefreshDBVgInfo will handle dbFName == null.
1599
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
32✔
1600
    if (code != TSDB_CODE_SUCCESS) {
32✔
1601
      return code;
×
1602
    }
1603
  }
1604

1605
  for (int32_t i = 0; i < tblNum; ++i) {
64✔
1606
    SName* tableName = taosArrayGet(pRequest->tableList, i);
32✔
1607

1608
    // catalogRefreshTableMeta will handle tableName == null.
1609
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
32✔
1610
    if (code != TSDB_CODE_SUCCESS) {
32✔
1611
      return code;
×
1612
    }
1613
  }
1614

1615
  return code;
32✔
1616
}
1617

1618
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
4,217,629✔
1619
  SCatalog* pCatalog = NULL;
4,217,629✔
1620
  int32_t   tbNum = taosArrayGetSize(tbList);
4,217,629✔
1621
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,217,629✔
1622
  if (code != TSDB_CODE_SUCCESS) {
4,217,629✔
1623
    return code;
×
1624
  }
1625

1626
  if (isView) {
4,217,629✔
1627
    for (int32_t i = 0; i < tbNum; ++i) {
836,136✔
1628
      SName* pViewName = taosArrayGet(tbList, i);
418,068✔
1629
      char   dbFName[TSDB_DB_FNAME_LEN];
415,488✔
1630
      if (NULL == pViewName) {
418,068✔
1631
        continue;
×
1632
      }
1633
      (void)tNameGetFullDbName(pViewName, dbFName);
418,068✔
1634
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
418,068✔
1635
    }
1636
  } else {
1637
    for (int32_t i = 0; i < tbNum; ++i) {
5,691,337✔
1638
      SName* pTbName = taosArrayGet(tbList, i);
1,891,776✔
1639
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
1,891,776✔
1640
    }
1641
  }
1642

1643
  return TSDB_CODE_SUCCESS;
4,217,629✔
1644
}
1645

1646
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
2,804,325✔
1647
  pEpSet->version = 0;
2,804,325✔
1648

1649
  // init mnode ip set
1650
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
2,805,049✔
1651
  mgmtEpSet->numOfEps = 0;
2,804,798✔
1652
  mgmtEpSet->inUse = 0;
2,804,666✔
1653

1654
  if (firstEp && firstEp[0] != 0) {
2,804,929✔
1655
    if (strlen(firstEp) >= TSDB_EP_LEN) {
2,804,798✔
1656
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1657
      return -1;
×
1658
    }
1659

1660
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,804,798✔
1661
    if (code != TSDB_CODE_SUCCESS) {
2,804,735✔
1662
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1663
      return terrno;
×
1664
    }
1665
    // uint32_t addr = 0;
1666
    SIpAddr addr = {0};
2,804,735✔
1667
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
2,805,010✔
1668
    if (code) {
2,805,072✔
1669
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
530✔
1670
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1671
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
565✔
1672
    } else {
1673
      mgmtEpSet->numOfEps++;
2,804,563✔
1674
    }
1675
  }
1676

1677
  if (secondEp && secondEp[0] != 0) {
2,804,113✔
1678
    if (strlen(secondEp) >= TSDB_EP_LEN) {
1,846,187✔
1679
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1680
      return terrno;
×
1681
    }
1682

1683
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
1,846,187✔
1684
    if (code != TSDB_CODE_SUCCESS) {
1,846,983✔
1685
      return code;
×
1686
    }
1687
    SIpAddr addr = {0};
1,846,983✔
1688
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
1,846,983✔
1689
    if (code) {
1,846,358✔
1690
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1691
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1692
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1693
    } else {
1694
      mgmtEpSet->numOfEps++;
1,846,358✔
1695
    }
1696
  }
1697

1698
  if (mgmtEpSet->numOfEps == 0) {
2,805,537✔
1699
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
565✔
1700
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
565✔
1701
  }
1702

1703
  return 0;
2,803,860✔
1704
}
1705

1706
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp,
2,804,947✔
1707
                        void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1708
  *pTscObj = NULL;
2,804,947✔
1709
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
2,804,947✔
1710
  if (TSDB_CODE_SUCCESS != code) {
2,804,947✔
1711
    return code;
×
1712
  }
1713

1714
  SRequestObj* pRequest = NULL;
2,804,947✔
1715
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
2,804,947✔
1716
  if (TSDB_CODE_SUCCESS != code) {
2,804,285✔
1717
    destroyTscObj(*pTscObj);
×
1718
    return code;
×
1719
  }
1720

1721
  pRequest->sqlstr = taosStrdup("taos_connect");
2,804,285✔
1722
  if (pRequest->sqlstr) {
2,804,834✔
1723
    pRequest->sqlLen = strlen(pRequest->sqlstr);
2,804,834✔
1724
  } else {
1725
    return terrno;
×
1726
  }
1727

1728
  SMsgSendInfo* body = NULL;
2,804,834✔
1729
  code = buildConnectMsg(pRequest, &body, totpCode);
2,804,834✔
1730
  if (TSDB_CODE_SUCCESS != code) {
2,804,580✔
1731
    destroyTscObj(*pTscObj);
×
1732
    return code;
×
1733
  }
1734

1735
  // int64_t transporterId = 0;
1736
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
2,804,580✔
1737
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
2,804,325✔
1738
  if (TSDB_CODE_SUCCESS != code) {
2,804,946✔
1739
    destroyTscObj(*pTscObj);
×
1740
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1741
    return code;
×
1742
  }
1743
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
2,804,946✔
1744
    destroyTscObj(*pTscObj);
×
1745
    tscError("failed to wait sem, code:%s", terrstr());
×
1746
    return terrno;
×
1747
  }
1748
  if (pRequest->code != TSDB_CODE_SUCCESS) {
2,804,630✔
1749
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
7,889✔
1750
    tscError("failed to connect to server, reason: %s", errorMsg);
7,889✔
1751

1752
    terrno = pRequest->code;
7,889✔
1753
    destroyRequest(pRequest);
7,889✔
1754
    taos_close_internal(*pTscObj);
7,889✔
1755
    *pTscObj = NULL;
7,889✔
1756
    return terrno;
7,889✔
1757
  }
1758
  if (connType == CONN_TYPE__AUTH_TEST) {
2,797,058✔
1759
    terrno = TSDB_CODE_SUCCESS;
×
1760
    destroyRequest(pRequest);
×
1761
    taos_close_internal(*pTscObj);
×
1762
    *pTscObj = NULL;
×
1763
    return TSDB_CODE_SUCCESS;
×
1764
  }
1765

1766
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
2,797,058✔
1767
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1768
  destroyRequest(pRequest);
2,797,058✔
1769
  return code;
2,797,058✔
1770
}
1771

1772
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
2,804,172✔
1773
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,804,172✔
1774
  if (*pMsgSendInfo == NULL) {
2,804,947✔
1775
    return terrno;
×
1776
  }
1777

1778
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
2,804,692✔
1779

1780
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
2,804,947✔
1781
  (*pMsgSendInfo)->requestId = pRequest->requestId;
2,804,947✔
1782
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
2,804,947✔
1783
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
2,804,548✔
1784
  if (NULL == (*pMsgSendInfo)->param) {
2,804,524✔
1785
    taosMemoryFree(*pMsgSendInfo);
×
1786
    return terrno;
×
1787
  }
1788

1789
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
2,804,524✔
1790

1791
  SConnectReq connectReq = {0};
2,804,524✔
1792
  STscObj*    pObj = pRequest->pTscObj;
2,804,524✔
1793

1794
  char* db = getDbOfConnection(pObj);
2,804,524✔
1795
  if (db != NULL) {
2,804,580✔
1796
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1,206,324✔
1797
  } else if (terrno) {
1,598,256✔
1798
    taosMemoryFree(*pMsgSendInfo);
×
1799
    return terrno;
×
1800
  }
1801
  taosMemoryFreeClear(db);
2,804,692✔
1802

1803
  connectReq.connType = pObj->connType;
2,805,058✔
1804
  connectReq.pid = appInfo.pid;
2,805,058✔
1805
  connectReq.startTime = appInfo.startTime;
2,804,803✔
1806
  connectReq.totpCode = totpCode;
2,804,803✔
1807

1808
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
2,804,803✔
1809
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
2,804,528✔
1810
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
2,804,779✔
1811
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
2,804,778✔
1812

1813
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
2,804,778✔
1814
  void*   pReq = taosMemoryMalloc(contLen);
2,804,590✔
1815
  if (NULL == pReq) {
2,804,603✔
1816
    taosMemoryFree(*pMsgSendInfo);
×
1817
    return terrno;
×
1818
  }
1819

1820
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
2,804,603✔
1821
    taosMemoryFree(*pMsgSendInfo);
×
1822
    taosMemoryFree(pReq);
×
1823
    return terrno;
×
1824
  }
1825

1826
  (*pMsgSendInfo)->msgInfo.len = contLen;
2,804,580✔
1827
  (*pMsgSendInfo)->msgInfo.pData = pReq;
2,804,835✔
1828
  return TSDB_CODE_SUCCESS;
2,804,835✔
1829
}
1830

1831
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,316,402,345✔
1832
  if (NULL == pEpSet) {
1,316,402,345✔
1833
    return;
1,311,842,344✔
1834
  }
1835

1836
  switch (pSendInfo->target.type) {
4,560,001✔
1837
    case TARGET_TYPE_MNODE:
1,243✔
1838
      if (NULL == pTscObj) {
1,243✔
1839
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1840
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1841
        return;
×
1842
      }
1843

1844
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,243✔
1845
      SEpSet* pOrig = &originEpset;
1,243✔
1846
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
1,243✔
1847
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
1,243✔
1848
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
1,243✔
1849
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1850
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
1,243✔
1851
      break;
1,119,814✔
1852
    case TARGET_TYPE_VNODE: {
4,350,439✔
1853
      if (NULL == pTscObj) {
4,350,439✔
1854
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1855
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1856
        return;
×
1857
      }
1858

1859
      SCatalog* pCatalog = NULL;
4,350,439✔
1860
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,350,439✔
1861
      if (code != TSDB_CODE_SUCCESS) {
4,350,471✔
1862
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1863
                 tstrerror(code));
1864
        return;
×
1865
      }
1866

1867
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
4,350,471✔
1868
      if (code != TSDB_CODE_SUCCESS) {
4,350,503✔
1869
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
32✔
1870
                 tstrerror(code));
1871
        return;
×
1872
      }
1873
      taosMemoryFreeClear(pSendInfo->target.dbFName);
4,350,471✔
1874
      break;
4,350,503✔
1875
    }
1876
    default:
209,333✔
1877
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
209,333✔
1878
      break;
209,269✔
1879
  }
1880
}
1881

1882
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
1,316,938,905✔
1883
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
1,316,938,905✔
1884
  if (pMsg->info.ahandle == NULL) {
1,316,939,813✔
1885
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
527,162✔
1886
    rpcFreeCont(pMsg->pCont);
527,162✔
1887
    taosMemoryFree(pEpSet);
527,162✔
1888
    return TSDB_CODE_TSC_INTERNAL_ERROR;
527,162✔
1889
  }
1890

1891
  STscObj* pTscObj = NULL;
1,316,412,890✔
1892

1893
  STraceId* trace = &pMsg->info.traceId;
1,316,412,890✔
1894
  char      tbuf[40] = {0};
1,316,413,108✔
1895
  TRACE_TO_STR(trace, tbuf);
1,316,412,247✔
1896

1897
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
1,316,416,099✔
1898
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1899

1900
  if (pSendInfo->requestObjRefId != 0) {
1,316,416,370✔
1901
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,120,664,739✔
1902
    if (pRequest) {
1,120,666,108✔
1903
      if (pRequest->self != pSendInfo->requestObjRefId) {
1,109,887,234✔
1904
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1905
                 pSendInfo->requestObjRefId);
1906

1907
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1908
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1909
        }
1910
        rpcFreeCont(pMsg->pCont);
×
1911
        taosMemoryFree(pEpSet);
×
1912
        destroySendMsgInfo(pSendInfo);
×
1913
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1914
      }
1915
      pTscObj = pRequest->pTscObj;
1,109,885,544✔
1916
    }
1917
  }
1918

1919
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
1,316,411,415✔
1920

1921
  SDataBuf buf = {.msgType = pMsg->msgType,
1,316,400,164✔
1922
                  .len = pMsg->contLen,
1,316,404,068✔
1923
                  .pData = NULL,
1924
                  .handle = pMsg->info.handle,
1,316,408,402✔
1925
                  .handleRefId = pMsg->info.refId,
1,316,407,864✔
1926
                  .pEpSet = pEpSet};
1927

1928
  if (pMsg->contLen > 0) {
1,316,409,305✔
1929
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
1,274,384,877✔
1930
    if (buf.pData == NULL) {
1,274,372,931✔
1931
      pMsg->code = terrno;
×
1932
    } else {
1933
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
1,274,372,931✔
1934
    }
1935
  }
1936

1937
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
1,316,406,218✔
1938

1939
  if (pTscObj) {
1,316,393,842✔
1940
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,109,877,559✔
1941
    if (TSDB_CODE_SUCCESS != code) {
1,109,888,396✔
1942
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1943
      terrno = code;
×
1944
      pMsg->code = code;
×
1945
    }
1946
  }
1947

1948
  rpcFreeCont(pMsg->pCont);
1,316,404,679✔
1949
  destroySendMsgInfo(pSendInfo);
1,316,397,202✔
1950
  return TSDB_CODE_SUCCESS;
1,316,371,694✔
1951
}
1952

1953
int32_t doProcessMsgFromServer(void* param) {
1,316,941,111✔
1954
  AsyncArg* arg = (AsyncArg*)param;
1,316,941,111✔
1955
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
1,316,941,111✔
1956
  taosMemoryFree(arg);
1,316,894,121✔
1957
  return code;
1,316,912,152✔
1958
}
1959

1960
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,316,872,833✔
1961
  int32_t code = 0;
1,316,872,833✔
1962
  SEpSet* tEpSet = NULL;
1,316,872,833✔
1963

1964
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
1,316,872,833✔
1965

1966
  if (pEpSet != NULL) {
1,316,867,305✔
1967
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
4,561,015✔
1968
    if (NULL == tEpSet) {
4,560,693✔
1969
      code = terrno;
×
1970
      pMsg->code = terrno;
×
1971
      goto _exit;
×
1972
    }
1973
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
4,560,693✔
1974
  }
1975

1976
  // pMsg is response msg
1977
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
1,316,866,983✔
1978
    // restore origin code
1979
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
2,804,947✔
1980
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1981
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
2,804,947✔
1982
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1983
    }
1984
  } else {
1985
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1986
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
1,314,071,232✔
1987
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1988
    }
1989
  }
1990

1991
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
1,316,891,626✔
1992
  if (NULL == arg) {
1,316,889,191✔
1993
    code = terrno;
×
1994
    pMsg->code = code;
×
1995
    goto _exit;
×
1996
  }
1997

1998
  arg->msg = *pMsg;
1,316,889,191✔
1999
  arg->pEpset = tEpSet;
1,316,890,964✔
2000

2001
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
1,316,895,600✔
2002
    pMsg->code = code;
×
2003
    taosMemoryFree(arg);
×
2004
    goto _exit;
×
2005
  }
2006
  return;
1,316,914,366✔
2007

2008
_exit:
×
2009
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
2010
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
2011
  if (code != 0) {
×
2012
    tscError("failed to sched msg to tsc, tsc ready quit");
×
2013
  }
2014
}
2015

2016
TAOS* taos_connect_totp(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
35✔
2017
                        uint16_t port) {
2018
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
35✔
2019
  if (user == NULL) {
35✔
2020
    user = TSDB_DEFAULT_USER;
×
2021
  }
2022

2023
  if (pass == NULL) {
35✔
2024
    pass = TSDB_DEFAULT_PASS;
×
2025
  }
2026

2027
  STscObj* pObj = NULL;
35✔
2028
  int32_t  code = taos_connect_internal(ip, user, pass, NULL, totp, db, port, CONN_TYPE__QUERY, &pObj);
35✔
2029
  if (TSDB_CODE_SUCCESS == code) {
35✔
2030
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
35✔
2031
    if (NULL == rid) {
35✔
2032
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2033
      return NULL;
×
2034
    }
2035
    *rid = pObj->id;
35✔
2036
    return (TAOS*)rid;
35✔
2037
  } else {
2038
    terrno = code;
×
2039
  }
2040

2041
  return NULL;
×
2042
}
2043

2044
int taos_connect_test(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
×
2045
                      uint16_t port) {
2046
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
×
2047
  if (user == NULL) {
×
2048
    user = TSDB_DEFAULT_USER;
×
2049
  }
2050

2051
  if (pass == NULL) {
×
2052
    pass = TSDB_DEFAULT_PASS;
×
2053
  }
2054

2055
  STscObj* pObj = NULL;
×
2056
  return taos_connect_internal(ip, user, pass, NULL, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
×
2057
}
2058

2059
TAOS* taos_connect_token(const char* ip, const char* token, const char* db, uint16_t port) { return NULL; }
×
2060

2061
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
334✔
2062
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
334✔
2063
  if (user == NULL) {
334✔
2064
    user = TSDB_DEFAULT_USER;
×
2065
  }
2066

2067
  if (auth == NULL) {
334✔
2068
    tscError("No auth info is given, failed to connect to server");
×
2069
    return NULL;
×
2070
  }
2071

2072
  STscObj* pObj = NULL;
334✔
2073
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
334✔
2074
  if (TSDB_CODE_SUCCESS == code) {
334✔
2075
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
138✔
2076
    if (NULL == rid) {
138✔
2077
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2078
    }
2079
    *rid = pObj->id;
138✔
2080
    return (TAOS*)rid;
138✔
2081
  }
2082

2083
  return NULL;
196✔
2084
}
2085

2086
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
2087
//                      const char* db, int dbLen, uint16_t port) {
2088
//   char ipStr[TSDB_EP_LEN] = {0};
2089
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
2090
//   char userStr[TSDB_USER_LEN] = {0};
2091
//   char passStr[TSDB_PASSWORD_LEN] = {0};
2092
//
2093
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
2094
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
2095
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
2096
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
2097
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
2098
// }
2099

2100
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,147,483,647✔
2101
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2102
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2103

2104
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2105
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2106

2107
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2108
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2109
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
2,147,483,647✔
2110

2111
        if (IS_STR_DATA_BLOB(type)) {
2,147,483,647✔
2112
          pResultInfo->length[i] = blobDataLen(pStart);
×
2113
          pResultInfo->row[i] = blobDataVal(pStart);
×
2114
        } else {
2115
          pResultInfo->length[i] = varDataLen(pStart);
2,147,483,647✔
2116
          pResultInfo->row[i] = varDataVal(pStart);
2,147,483,647✔
2117
        }
2118
      } else {
2119
        pResultInfo->row[i] = NULL;
205,599,244✔
2120
        pResultInfo->length[i] = 0;
205,651,487✔
2121
      }
2122
    } else {
2123
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2124
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2125
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2126
      } else {
2127
        pResultInfo->row[i] = NULL;
702,553,595✔
2128
        pResultInfo->length[i] = 0;
706,099,547✔
2129
      }
2130
    }
2131
  }
2132
}
2,147,483,647✔
2133

2134
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2135
  if (pRequest == NULL) {
×
2136
    return NULL;
×
2137
  }
2138

2139
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2140
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2141
    // All data has returned to App already, no need to try again
2142
    if (pResultInfo->completed) {
×
2143
      pResultInfo->numOfRows = 0;
×
2144
      return NULL;
×
2145
    }
2146

2147
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2148
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2149

2150
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2151
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2152
      pResultInfo->numOfRows = 0;
×
2153
      return NULL;
×
2154
    }
2155

2156
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2157
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2158
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2159
      pResultInfo->numOfRows = 0;
×
2160
      return NULL;
×
2161
    }
2162

2163
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2164
             ", complete:%d, QID:0x%" PRIx64,
2165
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2166

2167
    STscObj*            pTscObj = pRequest->pTscObj;
×
2168
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2169
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2170

2171
    if (pResultInfo->numOfRows == 0) {
×
2172
      return NULL;
×
2173
    }
2174
  }
2175

2176
  if (setupOneRowPtr) {
×
2177
    doSetOneRowPtr(pResultInfo);
×
2178
    pResultInfo->current += 1;
×
2179
  }
2180

2181
  return pResultInfo->row;
×
2182
}
2183

2184
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
91,013,245✔
2185
  tsem_t* sem = param;
91,013,245✔
2186
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
91,013,245✔
2187
    tscError("failed to post sem, code:%s", terrstr());
×
2188
  }
2189
}
91,013,245✔
2190

2191
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1,338,803,612✔
2192
  if (pRequest == NULL) {
1,338,803,612✔
2193
    return NULL;
×
2194
  }
2195

2196
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,338,803,612✔
2197
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
1,338,869,252✔
2198
    // All data has returned to App already, no need to try again
2199
    if (pResultInfo->completed) {
162,015,829✔
2200
      pResultInfo->numOfRows = 0;
71,011,046✔
2201
      return NULL;
71,011,272✔
2202
    }
2203

2204
    // convert ucs4 to native multi-bytes string
2205
    pResultInfo->convertUcs4 = convertUcs4;
91,012,827✔
2206
    tsem_t sem;
90,082,614✔
2207
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
91,013,036✔
2208
      tscError("failed to init sem, code:%s", terrstr());
×
2209
    }
2210
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
91,011,706✔
2211
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
91,013,245✔
2212
      tscError("failed to wait sem, code:%s", terrstr());
×
2213
    }
2214
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
91,013,245✔
2215
      tscError("failed to destroy sem, code:%s", terrstr());
×
2216
    }
2217
    pRequest->inCallback = false;
91,012,636✔
2218
  }
2219

2220
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,267,888,417✔
2221
    return NULL;
6,941,112✔
2222
  } else {
2223
    if (setupOneRowPtr) {
1,260,946,291✔
2224
      doSetOneRowPtr(pResultInfo);
1,178,144,995✔
2225
      pResultInfo->current += 1;
1,178,149,947✔
2226
    }
2227

2228
    return pResultInfo->row;
1,260,949,049✔
2229
  }
2230
}
2231

2232
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
129,790,015✔
2233
  if (pResInfo->row == NULL) {
129,790,015✔
2234
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
114,709,394✔
2235
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
114,709,394✔
2236
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
114,709,088✔
2237
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
114,708,871✔
2238

2239
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
114,709,094✔
2240
      taosMemoryFree(pResInfo->row);
3,116✔
2241
      taosMemoryFree(pResInfo->pCol);
×
2242
      taosMemoryFree(pResInfo->length);
×
2243
      taosMemoryFree(pResInfo->convertBuf);
×
2244
      return terrno;
×
2245
    }
2246
  }
2247

2248
  return TSDB_CODE_SUCCESS;
129,790,103✔
2249
}
2250

2251
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
129,530,165✔
2252
  int32_t idx = -1;
129,530,165✔
2253
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
129,530,844✔
2254
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
129,529,910✔
2255

2256
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
736,895,718✔
2257
    int32_t type = pResultInfo->fields[i].type;
607,369,504✔
2258
    int32_t schemaBytes =
2259
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
607,369,301✔
2260

2261
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
607,366,567✔
2262
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
22,306,052✔
2263
      if (p == NULL) {
22,306,052✔
2264
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2265
        return terrno;
×
2266
      }
2267

2268
      pResultInfo->convertBuf[i] = p;
22,306,052✔
2269

2270
      SResultColumn* pCol = &pResultInfo->pCol[i];
22,306,052✔
2271
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2272
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2273
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2274

2275
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2276
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2277
            tscError(
148✔
2278
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2279
                "colLength[i]):%p",
2280
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2281
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
148✔
2282
            return TSDB_CODE_TSC_INTERNAL_ERROR;
75✔
2283
          }
2284

2285
          varDataSetLen(p, len);
2,147,483,647✔
2286
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2287
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2288
        }
2289
      }
2290

2291
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
22,305,977✔
2292
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
22,305,977✔
2293
    }
2294
  }
2295
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
129,530,195✔
2296
  return TSDB_CODE_SUCCESS;
129,530,333✔
2297
}
2298

2299
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
129,529,451✔
2300
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
736,892,754✔
2301
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
607,367,224✔
2302
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
607,366,257✔
2303
    int32_t       type = pFieldE->type;
607,366,597✔
2304
    int32_t       bufLen = 0;
607,350,705✔
2305
    char*         p = NULL;
607,350,705✔
2306
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
607,350,705✔
2307
      continue;
605,719,697✔
2308
    } else {
2309
      bufLen = 64;
1,645,457✔
2310
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
1,645,457✔
2311
      pFieldE->bytes = bufLen;
1,645,457✔
2312
      pField->bytes = bufLen;
1,645,457✔
2313
    }
2314
    if (!p) return terrno;
1,645,457✔
2315
    pResultInfo->convertBuf[i] = p;
1,645,457✔
2316

2317
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
1,026,247,811✔
2318
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
1,024,602,354✔
2319
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
1,024,602,354✔
2320
      p += bufLen;
1,024,602,354✔
2321
      if (TSDB_CODE_SUCCESS != code) {
1,024,602,354✔
2322
        return code;
×
2323
      }
2324
    }
2325
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
1,645,457✔
2326
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,645,457✔
2327
  }
2328
  return 0;
129,529,541✔
2329
}
2330

2331
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
399,552✔
2332
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
799,104✔
2333
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
399,552✔
2334
}
2335

2336
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
199,776✔
2337
  char*   p = (char*)pResultInfo->pData;
199,776✔
2338
  int32_t blockVersion = *(int32_t*)p;
199,776✔
2339

2340
  int32_t numOfRows = pResultInfo->numOfRows;
199,776✔
2341
  int32_t numOfCols = pResultInfo->numOfCols;
199,776✔
2342

2343
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2344
  // length |
2345
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
199,776✔
2346
  if (numOfCols != cols) {
199,776✔
2347
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2348
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2349
  }
2350

2351
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
199,776✔
2352
  int32_t* colLength = (int32_t*)(p + len);
199,776✔
2353
  len += sizeof(int32_t) * numOfCols;
199,776✔
2354

2355
  char* pStart = p + len;
199,776✔
2356
  for (int32_t i = 0; i < numOfCols; ++i) {
867,578✔
2357
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
667,802✔
2358

2359
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
667,802✔
2360
      int32_t* offset = (int32_t*)pStart;
236,522✔
2361
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
236,522✔
2362
      len += lenTmp;
236,522✔
2363
      pStart += lenTmp;
236,522✔
2364

2365
      int32_t estimateColLen = 0;
236,522✔
2366
      for (int32_t j = 0; j < numOfRows; ++j) {
1,229,812✔
2367
        if (offset[j] == -1) {
993,290✔
2368
          continue;
50,008✔
2369
        }
2370
        char* data = offset[j] + pStart;
943,282✔
2371

2372
        int32_t jsonInnerType = *data;
943,282✔
2373
        char*   jsonInnerData = data + CHAR_BYTES;
943,282✔
2374
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
943,282✔
2375
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
13,056✔
2376
        } else if (tTagIsJson(data)) {
930,226✔
2377
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
219,300✔
2378
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
710,926✔
2379
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
661,966✔
2380
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,960✔
2381
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
35,904✔
2382
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
13,056✔
2383
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
13,056✔
2384
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
2385
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2386
        } else {
2387
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2388
          return -1;
×
2389
        }
2390
      }
2391
      len += TMAX(colLen, estimateColLen);
236,522✔
2392
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
431,280✔
2393
      int32_t lenTmp = numOfRows * sizeof(int32_t);
54,400✔
2394
      len += (lenTmp + colLen);
54,400✔
2395
      pStart += lenTmp;
54,400✔
2396
    } else {
2397
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
376,880✔
2398
      len += (lenTmp + colLen);
376,880✔
2399
      pStart += lenTmp;
376,880✔
2400
    }
2401
    pStart += colLen;
667,802✔
2402
  }
2403

2404
  // Ensure the complete structure of the block, including the blankfill field,
2405
  // even though it is not used on the client side.
2406
  len += sizeof(bool);
199,776✔
2407
  return len;
199,776✔
2408
}
2409

2410
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
129,789,194✔
2411
  int32_t numOfRows = pResultInfo->numOfRows;
129,789,194✔
2412
  int32_t numOfCols = pResultInfo->numOfCols;
129,789,582✔
2413
  bool    needConvert = false;
129,790,592✔
2414
  for (int32_t i = 0; i < numOfCols; ++i) {
738,427,729✔
2415
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
608,837,237✔
2416
      needConvert = true;
199,776✔
2417
      break;
199,776✔
2418
    }
2419
  }
2420

2421
  if (!needConvert) {
129,790,268✔
2422
    return TSDB_CODE_SUCCESS;
129,590,492✔
2423
  }
2424

2425
  tscDebug("start to convert form json format string");
199,776✔
2426

2427
  char*   p = (char*)pResultInfo->pData;
199,776✔
2428
  int32_t blockVersion = *(int32_t*)p;
199,776✔
2429
  int32_t dataLen = estimateJsonLen(pResultInfo);
199,776✔
2430
  if (dataLen <= 0) {
199,776✔
2431
    tscError("doConvertJson error: estimateJsonLen failed");
×
2432
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2433
  }
2434

2435
  taosMemoryFreeClear(pResultInfo->convertJson);
199,776✔
2436
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
199,776✔
2437
  if (pResultInfo->convertJson == NULL) return terrno;
199,776✔
2438
  char* p1 = pResultInfo->convertJson;
199,776✔
2439

2440
  int32_t totalLen = 0;
199,776✔
2441
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
199,776✔
2442
  if (numOfCols != cols) {
199,776✔
2443
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2444
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2445
  }
2446

2447
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
199,776✔
2448
  (void)memcpy(p1, p, len);
199,776✔
2449

2450
  p += len;
199,776✔
2451
  p1 += len;
199,776✔
2452
  totalLen += len;
199,776✔
2453

2454
  len = sizeof(int32_t) * numOfCols;
199,776✔
2455
  int32_t* colLength = (int32_t*)p;
199,776✔
2456
  int32_t* colLength1 = (int32_t*)p1;
199,776✔
2457
  (void)memcpy(p1, p, len);
199,776✔
2458
  p += len;
199,776✔
2459
  p1 += len;
199,776✔
2460
  totalLen += len;
199,776✔
2461

2462
  char* pStart = p;
199,776✔
2463
  char* pStart1 = p1;
199,776✔
2464
  for (int32_t i = 0; i < numOfCols; ++i) {
867,578✔
2465
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
667,802✔
2466
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
667,802✔
2467
    if (colLen >= dataLen) {
667,802✔
2468
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2469
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2470
    }
2471
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
667,802✔
2472
      int32_t* offset = (int32_t*)pStart;
236,522✔
2473
      int32_t* offset1 = (int32_t*)pStart1;
236,522✔
2474
      len = numOfRows * sizeof(int32_t);
236,522✔
2475
      (void)memcpy(pStart1, pStart, len);
236,522✔
2476
      pStart += len;
236,522✔
2477
      pStart1 += len;
236,522✔
2478
      totalLen += len;
236,522✔
2479

2480
      len = 0;
236,522✔
2481
      for (int32_t j = 0; j < numOfRows; ++j) {
1,229,812✔
2482
        if (offset[j] == -1) {
993,290✔
2483
          continue;
50,008✔
2484
        }
2485
        char* data = offset[j] + pStart;
943,282✔
2486

2487
        int32_t jsonInnerType = *data;
943,282✔
2488
        char*   jsonInnerData = data + CHAR_BYTES;
943,282✔
2489
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
943,282✔
2490
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
943,282✔
2491
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
13,056✔
2492
          varDataSetLen(dst, strlen(varDataVal(dst)));
13,056✔
2493
        } else if (tTagIsJson(data)) {
930,226✔
2494
          char* jsonString = NULL;
219,300✔
2495
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
219,300✔
2496
          if (jsonString == NULL) {
219,300✔
2497
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2498
            return terrno;
×
2499
          }
2500
          STR_TO_VARSTR(dst, jsonString);
219,300✔
2501
          taosMemoryFree(jsonString);
219,300✔
2502
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
710,926✔
2503
          *(char*)varDataVal(dst) = '\"';
661,966✔
2504
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
661,966✔
2505
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(tmp),
661,966✔
2506
                                         pResultInfo->charsetCxt);
2507
          if (length <= 0) {
661,966✔
2508
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
544✔
2509
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2510
            length = 0;
544✔
2511
          }
2512
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,
661,966✔
2513
                                                 varDataVal(tmp), length);
2514
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
661,966✔
2515
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
661,966✔
2516
          tscError("value:%s.", varDataVal(dst));
661,966✔
2517
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,960✔
2518
          double jsonVd = *(double*)(jsonInnerData);
35,904✔
2519
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
35,904✔
2520
          varDataSetLen(dst, strlen(varDataVal(dst)));
35,904✔
2521
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
13,056✔
2522
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
13,056✔
2523
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
13,056✔
2524
          varDataSetLen(dst, strlen(varDataVal(dst)));
13,056✔
2525
        } else {
2526
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2527
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2528
        }
2529

2530
        offset1[j] = len;
943,282✔
2531
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
943,282✔
2532
        len += varDataTLen(dst);
943,282✔
2533
      }
2534
      colLen1 = len;
236,522✔
2535
      totalLen += colLen1;
236,522✔
2536
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
236,522✔
2537
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
431,280✔
2538
      len = numOfRows * sizeof(int32_t);
54,400✔
2539
      (void)memcpy(pStart1, pStart, len);
54,400✔
2540
      pStart += len;
54,400✔
2541
      pStart1 += len;
54,400✔
2542
      totalLen += len;
54,400✔
2543
      totalLen += colLen;
54,400✔
2544
      (void)memcpy(pStart1, pStart, colLen);
54,400✔
2545
    } else {
2546
      len = BitmapLen(pResultInfo->numOfRows);
376,880✔
2547
      (void)memcpy(pStart1, pStart, len);
376,880✔
2548
      pStart += len;
376,880✔
2549
      pStart1 += len;
376,880✔
2550
      totalLen += len;
376,880✔
2551
      totalLen += colLen;
376,880✔
2552
      (void)memcpy(pStart1, pStart, colLen);
376,880✔
2553
    }
2554
    pStart += colLen;
667,802✔
2555
    pStart1 += colLen1;
667,802✔
2556
  }
2557

2558
  // Ensure the complete structure of the block, including the blankfill field,
2559
  // even though it is not used on the client side.
2560
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2561
  totalLen += sizeof(bool);
199,776✔
2562

2563
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
199,776✔
2564
  pResultInfo->pData = pResultInfo->convertJson;
199,776✔
2565
  return TSDB_CODE_SUCCESS;
199,776✔
2566
}
2567

2568
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
145,595,556✔
2569
  bool convertForDecimal = convertUcs4;
145,595,556✔
2570
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
145,595,556✔
2571
    tscError("setResultDataPtr paras error");
256✔
2572
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2573
  }
2574

2575
  if (pResultInfo->numOfRows == 0) {
145,595,597✔
2576
    return TSDB_CODE_SUCCESS;
15,805,444✔
2577
  }
2578

2579
  if (pResultInfo->pData == NULL) {
129,790,250✔
2580
    tscError("setResultDataPtr error: pData is NULL");
×
2581
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2582
  }
2583

2584
  int32_t code = doPrepareResPtr(pResultInfo);
129,790,121✔
2585
  if (code != TSDB_CODE_SUCCESS) {
129,789,715✔
2586
    return code;
×
2587
  }
2588
  code = doConvertJson(pResultInfo);
129,789,715✔
2589
  if (code != TSDB_CODE_SUCCESS) {
129,789,812✔
2590
    return code;
×
2591
  }
2592

2593
  char* p = (char*)pResultInfo->pData;
129,789,812✔
2594

2595
  // version:
2596
  int32_t blockVersion = *(int32_t*)p;
129,789,812✔
2597
  p += sizeof(int32_t);
129,789,812✔
2598

2599
  int32_t dataLen = *(int32_t*)p;
129,789,789✔
2600
  p += sizeof(int32_t);
129,790,209✔
2601

2602
  int32_t rows = *(int32_t*)p;
129,790,080✔
2603
  p += sizeof(int32_t);
129,790,103✔
2604

2605
  int32_t cols = *(int32_t*)p;
129,790,097✔
2606
  p += sizeof(int32_t);
129,789,774✔
2607

2608
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
129,790,000✔
2609
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
1,709✔
2610
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2611
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2612
  }
2613

2614
  int32_t hasColumnSeg = *(int32_t*)p;
129,788,800✔
2615
  p += sizeof(int32_t);
129,789,903✔
2616

2617
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
129,789,897✔
2618
  p += sizeof(uint64_t);
129,789,897✔
2619

2620
  // check fields
2621
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
738,679,652✔
2622
    int8_t type = *(int8_t*)p;
608,891,671✔
2623
    p += sizeof(int8_t);
608,890,401✔
2624

2625
    int32_t bytes = *(int32_t*)p;
608,890,880✔
2626
    p += sizeof(int32_t);
608,890,548✔
2627

2628
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
608,891,085✔
2629
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
318,129✔
2630
    }
2631
  }
2632

2633
  int32_t* colLength = (int32_t*)p;
129,789,962✔
2634
  p += sizeof(int32_t) * pResultInfo->numOfCols;
129,789,962✔
2635

2636
  char* pStart = p;
129,789,774✔
2637
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
738,683,696✔
2638
    if ((pStart - pResultInfo->pData) >= dataLen) {
608,893,690✔
2639
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2640
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2641
    }
2642
    if (blockVersion == BLOCK_VERSION_1) {
608,891,366✔
2643
      colLength[i] = htonl(colLength[i]);
467,950,589✔
2644
    }
2645
    if (colLength[i] >= dataLen) {
608,892,476✔
2646
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2647
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2648
    }
2649
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
608,891,609✔
2650
      tscError("invalid type %d", pResultInfo->fields[i].type);
935✔
2651
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2652
    }
2653
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
608,892,807✔
2654
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
146,684,205✔
2655
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
146,683,736✔
2656
    } else {
2657
      pResultInfo->pCol[i].nullbitmap = pStart;
462,211,103✔
2658
      pStart += BitmapLen(pResultInfo->numOfRows);
462,211,972✔
2659
    }
2660

2661
    pResultInfo->pCol[i].pData = pStart;
608,895,155✔
2662
    pResultInfo->length[i] =
1,217,789,707✔
2663
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
1,211,632,034✔
2664
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
608,894,360✔
2665

2666
    pStart += colLength[i];
608,893,912✔
2667
  }
2668

2669
  p = pStart;
129,790,565✔
2670
  // bool blankFill = *(bool*)p;
2671
  p += sizeof(bool);
129,790,565✔
2672
  int32_t offset = p - pResultInfo->pData;
129,790,565✔
2673
  if (offset > dataLen) {
129,789,980✔
2674
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2675
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2676
  }
2677

2678
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2679
  if (convertUcs4) {
129,789,980✔
2680
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
129,531,053✔
2681
  }
2682
#endif
2683
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
129,789,132✔
2684
    code = convertDecimalType(pResultInfo);
129,530,227✔
2685
  }
2686
  return code;
129,788,911✔
2687
}
2688

2689
char* getDbOfConnection(STscObj* pObj) {
750,673,240✔
2690
  terrno = TSDB_CODE_SUCCESS;
750,673,240✔
2691
  char* p = NULL;
750,674,383✔
2692
  (void)taosThreadMutexLock(&pObj->mutex);
750,674,383✔
2693
  size_t len = strlen(pObj->db);
750,682,456✔
2694
  if (len > 0) {
750,682,665✔
2695
    p = taosStrndup(pObj->db, tListLen(pObj->db));
678,693,852✔
2696
    if (p == NULL) {
678,689,808✔
2697
      tscError("failed to taosStrndup db name");
×
2698
    }
2699
  }
2700

2701
  (void)taosThreadMutexUnlock(&pObj->mutex);
750,678,621✔
2702
  return p;
750,672,191✔
2703
}
2704

2705
void setConnectionDB(STscObj* pTscObj, const char* db) {
2,581,944✔
2706
  if (db == NULL || pTscObj == NULL) {
2,581,944✔
2707
    tscError("setConnectionDB para is NULL");
×
2708
    return;
×
2709
  }
2710

2711
  (void)taosThreadMutexLock(&pTscObj->mutex);
2,581,944✔
2712
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
2,582,170✔
2713
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
2,581,944✔
2714
}
2715

2716
void resetConnectDB(STscObj* pTscObj) {
×
2717
  if (pTscObj == NULL) {
×
2718
    return;
×
2719
  }
2720

2721
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2722
  pTscObj->db[0] = 0;
×
2723
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2724
}
2725

2726
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
115,137,985✔
2727
                              bool isStmt) {
2728
  if (pResultInfo == NULL || pRsp == NULL) {
115,137,985✔
UNCOV
2729
    tscError("setQueryResultFromRsp paras is null");
×
2730
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2731
  }
2732

2733
  taosMemoryFreeClear(pResultInfo->pRspMsg);
115,137,985✔
2734
  pResultInfo->pRspMsg = (const char*)pRsp;
115,137,985✔
2735
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
115,137,985✔
2736
  pResultInfo->current = 0;
115,137,985✔
2737
  pResultInfo->completed = (pRsp->completed == 1);
115,137,985✔
2738
  pResultInfo->precision = pRsp->precision;
115,137,985✔
2739

2740
  // decompress data if needed
2741
  int32_t payloadLen = htonl(pRsp->payloadLen);
115,137,985✔
2742

2743
  if (pRsp->compressed) {
115,137,985✔
2744
    if (pResultInfo->decompBuf == NULL) {
×
2745
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
2746
      if (pResultInfo->decompBuf == NULL) {
×
2747
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2748
        return terrno;
×
2749
      }
2750
      pResultInfo->decompBufSize = payloadLen;
×
2751
    } else {
2752
      if (pResultInfo->decompBufSize < payloadLen) {
×
2753
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
2754
        if (p == NULL) {
×
2755
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2756
          return terrno;
×
2757
        }
2758

2759
        pResultInfo->decompBuf = p;
×
2760
        pResultInfo->decompBufSize = payloadLen;
×
2761
      }
2762
    }
2763
  }
2764

2765
  if (payloadLen > 0) {
115,137,985✔
2766
    int32_t compLen = *(int32_t*)pRsp->data;
99,332,933✔
2767
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
99,332,933✔
2768

2769
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
99,332,933✔
2770

2771
    if (pRsp->compressed && compLen < rawLen) {
99,332,707✔
2772
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
2773
      if (len < 0) {
×
2774
        tscError("tsDecompressString failed");
×
2775
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2776
      }
2777
      if (len != rawLen) {
×
2778
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2779
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2780
      }
2781
      pResultInfo->pData = pResultInfo->decompBuf;
×
2782
      pResultInfo->payloadLen = rawLen;
×
2783
    } else {
2784
      pResultInfo->pData = pStart;
99,332,933✔
2785
      pResultInfo->payloadLen = htonl(pRsp->compLen);
99,332,933✔
2786
      if (pRsp->compLen != pRsp->payloadLen) {
99,332,933✔
2787
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2788
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2789
      }
2790
    }
2791
  }
2792

2793
  // TODO handle the compressed case
2794
  pResultInfo->totalRows += pResultInfo->numOfRows;
115,137,985✔
2795

2796
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
115,137,985✔
2797
  return code;
115,136,858✔
2798
}
2799

2800
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
479✔
2801
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
479✔
2802
  void*              clientRpc = NULL;
479✔
2803
  SServerStatusRsp   statusRsp = {0};
479✔
2804
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
479✔
2805
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
479✔
2806
  SRpcMsg  rpcRsp = {0};
479✔
2807
  SRpcInit rpcInit = {0};
479✔
2808
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
479✔
2809

2810
  rpcInit.label = "CHK";
479✔
2811
  rpcInit.numOfThreads = 1;
479✔
2812
  rpcInit.cfp = NULL;
479✔
2813
  rpcInit.sessions = 16;
479✔
2814
  rpcInit.connType = TAOS_CONN_CLIENT;
479✔
2815
  rpcInit.idleTime = tsShellActivityTimer * 1000;
479✔
2816
  rpcInit.compressSize = tsCompressMsgSize;
479✔
2817
  rpcInit.user = "_dnd";
479✔
2818

2819
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
479✔
2820
  connLimitNum = TMAX(connLimitNum, 10);
479✔
2821
  connLimitNum = TMIN(connLimitNum, 500);
479✔
2822
  rpcInit.connLimitNum = connLimitNum;
479✔
2823
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
479✔
2824
  rpcInit.readTimeout = tsReadTimeout;
479✔
2825
  rpcInit.ipv6 = tsEnableIpv6;
479✔
2826
  rpcInit.enableSSL = tsEnableTLS;
479✔
2827

2828
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
479✔
2829
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
479✔
2830
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
479✔
2831
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
479✔
2832
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
479✔
2833

2834
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
479✔
2835
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2836
    goto _OVER;
×
2837
  }
2838

2839
  clientRpc = rpcOpen(&rpcInit);
479✔
2840
  if (clientRpc == NULL) {
479✔
2841
    code = terrno;
×
2842
    tscError("failed to init server status client since %s", tstrerror(code));
×
2843
    goto _OVER;
×
2844
  }
2845

2846
  if (fqdn == NULL) {
479✔
2847
    fqdn = tsLocalFqdn;
479✔
2848
  }
2849

2850
  if (port == 0) {
479✔
2851
    port = tsServerPort;
479✔
2852
  }
2853

2854
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
479✔
2855
  epSet.eps[0].port = (uint16_t)port;
479✔
2856
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
479✔
2857
  if (TSDB_CODE_SUCCESS != ret) {
479✔
2858
    tscError("failed to send recv since %s", tstrerror(ret));
×
2859
    goto _OVER;
×
2860
  }
2861

2862
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
479✔
2863
    tscError("failed to send server status req since %s", terrstr());
137✔
2864
    goto _OVER;
137✔
2865
  }
2866

2867
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
342✔
2868
    tscError("failed to parse server status rsp since %s", terrstr());
×
2869
    goto _OVER;
×
2870
  }
2871

2872
  code = statusRsp.statusCode;
342✔
2873
  if (details != NULL) {
342✔
2874
    tstrncpy(details, statusRsp.details, maxlen);
342✔
2875
  }
2876

2877
_OVER:
409✔
2878
  if (clientRpc != NULL) {
479✔
2879
    rpcClose(clientRpc);
479✔
2880
  }
2881
  if (rpcRsp.pCont != NULL) {
479✔
2882
    rpcFreeCont(rpcRsp.pCont);
342✔
2883
  }
2884
  return code;
479✔
2885
}
2886

2887
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
1,284✔
2888
                      int32_t acctId, char* db) {
2889
  SName name = {0};
1,284✔
2890

2891
  if (len1 <= 0) {
1,284✔
2892
    return -1;
×
2893
  }
2894

2895
  const char* dbName = db;
1,284✔
2896
  const char* tbName = NULL;
1,284✔
2897
  int32_t     dbLen = 0;
1,284✔
2898
  int32_t     tbLen = 0;
1,284✔
2899
  if (len2 > 0) {
1,284✔
2900
    dbName = str + pos1;
×
2901
    dbLen = len1;
×
2902
    tbName = str + pos2;
×
2903
    tbLen = len2;
×
2904
  } else {
2905
    dbLen = strlen(db);
1,284✔
2906
    tbName = str + pos1;
1,284✔
2907
    tbLen = len1;
1,284✔
2908
  }
2909

2910
  if (dbLen <= 0 || tbLen <= 0) {
1,284✔
2911
    return -1;
×
2912
  }
2913

2914
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
1,284✔
2915
    return -1;
×
2916
  }
2917

2918
  if (tNameAddTbName(&name, tbName, tbLen)) {
1,284✔
2919
    return -1;
×
2920
  }
2921

2922
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,284✔
2923
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
1,284✔
2924

2925
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
1,284✔
2926
  if (pDb) {
1,284✔
2927
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2928
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2929
    }
2930
  } else {
2931
    STablesReq db;
1,284✔
2932
    db.pTables = taosArrayInit(20, sizeof(SName));
1,284✔
2933
    if (NULL == db.pTables) {
1,284✔
2934
      return terrno;
×
2935
    }
2936
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
1,284✔
2937
    if (NULL == taosArrayPush(db.pTables, &name)) {
2,568✔
2938
      return terrno;
×
2939
    }
2940
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
1,284✔
2941
  }
2942

2943
  return TSDB_CODE_SUCCESS;
1,284✔
2944
}
2945

2946
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
1,284✔
2947
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,284✔
2948
  if (NULL == pHash) {
1,284✔
2949
    return terrno;
×
2950
  }
2951

2952
  bool    inEscape = false;
1,284✔
2953
  int32_t code = 0;
1,284✔
2954
  void*   pIter = NULL;
1,284✔
2955

2956
  int32_t vIdx = 0;
1,284✔
2957
  int32_t vPos[2];
1,284✔
2958
  int32_t vLen[2];
1,284✔
2959

2960
  (void)memset(vPos, -1, sizeof(vPos));
1,284✔
2961
  (void)memset(vLen, 0, sizeof(vLen));
1,284✔
2962

2963
  for (int32_t i = 0;; ++i) {
6,420✔
2964
    if (0 == *(tbList + i)) {
6,420✔
2965
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
1,284✔
2966
        vLen[vIdx] = i - vPos[vIdx];
1,284✔
2967
      }
2968

2969
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
1,284✔
2970
      if (code) {
1,284✔
2971
        goto _return;
×
2972
      }
2973

2974
      break;
1,284✔
2975
    }
2976

2977
    if ('`' == *(tbList + i)) {
5,136✔
2978
      inEscape = !inEscape;
×
2979
      if (!inEscape) {
×
2980
        if (vPos[vIdx] >= 0) {
×
2981
          vLen[vIdx] = i - vPos[vIdx];
×
2982
        } else {
2983
          goto _return;
×
2984
        }
2985
      }
2986

2987
      continue;
×
2988
    }
2989

2990
    if (inEscape) {
5,136✔
2991
      if (vPos[vIdx] < 0) {
×
2992
        vPos[vIdx] = i;
×
2993
      }
2994
      continue;
×
2995
    }
2996

2997
    if ('.' == *(tbList + i)) {
5,136✔
2998
      if (vPos[vIdx] < 0) {
×
2999
        goto _return;
×
3000
      }
3001
      if (vLen[vIdx] <= 0) {
×
3002
        vLen[vIdx] = i - vPos[vIdx];
×
3003
      }
3004
      vIdx++;
×
3005
      if (vIdx >= 2) {
×
3006
        goto _return;
×
3007
      }
3008
      continue;
×
3009
    }
3010

3011
    if (',' == *(tbList + i)) {
5,136✔
3012
      if (vPos[vIdx] < 0) {
×
3013
        goto _return;
×
3014
      }
3015
      if (vLen[vIdx] <= 0) {
×
3016
        vLen[vIdx] = i - vPos[vIdx];
×
3017
      }
3018

3019
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
3020
      if (code) {
×
3021
        goto _return;
×
3022
      }
3023

3024
      (void)memset(vPos, -1, sizeof(vPos));
×
3025
      (void)memset(vLen, 0, sizeof(vLen));
×
3026
      vIdx = 0;
×
3027
      continue;
×
3028
    }
3029

3030
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
5,136✔
3031
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
3032
        vLen[vIdx] = i - vPos[vIdx];
×
3033
      }
3034
      continue;
×
3035
    }
3036

3037
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
5,136✔
3038
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
642✔
3039
      if (vLen[vIdx] > 0) {
5,136✔
3040
        goto _return;
×
3041
      }
3042
      if (vPos[vIdx] < 0) {
5,136✔
3043
        vPos[vIdx] = i;
1,284✔
3044
      }
3045
      continue;
5,136✔
3046
    }
3047

3048
    goto _return;
×
3049
  }
3050

3051
  int32_t dbNum = taosHashGetSize(pHash);
1,284✔
3052
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
1,284✔
3053
  if (NULL == pReq) {
1,284✔
3054
    TSC_ERR_JRET(terrno);
×
3055
  }
3056
  pIter = taosHashIterate(pHash, NULL);
1,284✔
3057
  while (pIter) {
2,568✔
3058
    STablesReq* pDb = (STablesReq*)pIter;
1,284✔
3059
    if (NULL == taosArrayPush(*pReq, pDb)) {
2,568✔
3060
      TSC_ERR_JRET(terrno);
×
3061
    }
3062
    pIter = taosHashIterate(pHash, pIter);
1,284✔
3063
  }
3064

3065
  taosHashCleanup(pHash);
1,284✔
3066

3067
  return TSDB_CODE_SUCCESS;
1,284✔
3068

3069
_return:
×
3070

3071
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3072

3073
  pIter = taosHashIterate(pHash, NULL);
×
3074
  while (pIter) {
×
3075
    STablesReq* pDb = (STablesReq*)pIter;
×
3076
    taosArrayDestroy(pDb->pTables);
×
3077
    pIter = taosHashIterate(pHash, pIter);
×
3078
  }
3079

3080
  taosHashCleanup(pHash);
×
3081

3082
  return terrno;
×
3083
}
3084

3085
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
1,284✔
3086
  SSyncQueryParam* pParam = param;
1,284✔
3087
  pParam->pRequest->code = code;
1,284✔
3088

3089
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,284✔
3090
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3091
  }
3092
}
1,284✔
3093

3094
void syncQueryFn(void* param, void* res, int32_t code) {
743,317,528✔
3095
  SSyncQueryParam* pParam = param;
743,317,528✔
3096
  pParam->pRequest = res;
743,317,528✔
3097

3098
  if (pParam->pRequest) {
743,320,290✔
3099
    pParam->pRequest->code = code;
743,302,904✔
3100
    clientOperateReport(pParam->pRequest);
743,310,256✔
3101
  }
3102

3103
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
743,300,979✔
3104
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3105
  }
3106
}
743,322,962✔
3107

3108
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
742,781,978✔
3109
                        int8_t source) {
3110
  if (sql == NULL || NULL == fp) {
742,781,978✔
3111
    terrno = TSDB_CODE_INVALID_PARA;
1,665✔
3112
    if (fp) {
×
3113
      fp(param, NULL, terrno);
×
3114
    }
3115

3116
    return;
×
3117
  }
3118

3119
  size_t sqlLen = strlen(sql);
742,780,678✔
3120
  if (sqlLen > (size_t)tsMaxSQLLength) {
742,780,678✔
3121
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
1,280✔
3122
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1,280✔
3123
    fp(param, NULL, terrno);
1,280✔
3124
    return;
1,280✔
3125
  }
3126

3127
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
742,779,398✔
3128

3129
  SRequestObj* pRequest = NULL;
742,779,740✔
3130
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
742,782,778✔
3131
  if (code != TSDB_CODE_SUCCESS) {
742,783,768✔
3132
    terrno = code;
×
3133
    fp(param, NULL, terrno);
×
3134
    return;
×
3135
  }
3136

3137
  code = connCheckAndUpateMetric(connId);
742,783,768✔
3138
  if (code != TSDB_CODE_SUCCESS) {
742,782,855✔
3139
    terrno = code;
×
3140
    fp(param, NULL, terrno);
×
3141
    return;
×
3142
  }
3143

3144
  pRequest->source = source;
742,782,855✔
3145
  pRequest->body.queryFp = fp;
742,783,353✔
3146
  doAsyncQuery(pRequest, false);
742,782,476✔
3147
}
3148

3149
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
834✔
3150
                                 int64_t reqid) {
3151
  if (sql == NULL || NULL == fp) {
834✔
3152
    terrno = TSDB_CODE_INVALID_PARA;
×
3153
    if (fp) {
×
3154
      fp(param, NULL, terrno);
×
3155
    }
3156

3157
    return;
×
3158
  }
3159

3160
  size_t sqlLen = strlen(sql);
834✔
3161
  if (sqlLen > (size_t)tsMaxSQLLength) {
834✔
3162
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
3163
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3164
    fp(param, NULL, terrno);
×
3165
    return;
×
3166
  }
3167

3168
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
834✔
3169

3170
  SRequestObj* pRequest = NULL;
834✔
3171
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
834✔
3172
  if (code != TSDB_CODE_SUCCESS) {
834✔
3173
    terrno = code;
×
3174
    fp(param, NULL, terrno);
×
3175
    return;
×
3176
  }
3177

3178
  code = connCheckAndUpateMetric(connId);
834✔
3179

3180
  if (code != TSDB_CODE_SUCCESS) {
834✔
3181
    terrno = code;
×
3182
    fp(param, NULL, terrno);
×
3183
    return;
×
3184
  }
3185

3186
  pRequest->body.queryFp = fp;
834✔
3187

3188
  doAsyncQuery(pRequest, false);
834✔
3189
}
3190

3191
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
742,741,512✔
3192
  if (NULL == taos) {
742,741,512✔
3193
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3194
    return NULL;
×
3195
  }
3196

3197
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
742,741,512✔
3198
  if (NULL == param) {
742,748,333✔
3199
    return NULL;
×
3200
  }
3201

3202
  int32_t code = tsem_init(&param->sem, 0, 0);
742,748,333✔
3203
  if (TSDB_CODE_SUCCESS != code) {
742,742,724✔
3204
    taosMemoryFree(param);
×
3205
    return NULL;
×
3206
  }
3207

3208
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
742,742,724✔
3209
  code = tsem_wait(&param->sem);
742,738,972✔
3210
  if (TSDB_CODE_SUCCESS != code) {
742,749,156✔
3211
    taosMemoryFree(param);
×
3212
    return NULL;
×
3213
  }
3214
  code = tsem_destroy(&param->sem);
742,749,156✔
3215
  if (TSDB_CODE_SUCCESS != code) {
742,750,013✔
3216
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3217
  }
3218

3219
  SRequestObj* pRequest = NULL;
742,750,057✔
3220
  if (param->pRequest != NULL) {
742,750,057✔
3221
    param->pRequest->syncQuery = true;
742,750,354✔
3222
    pRequest = param->pRequest;
742,749,773✔
3223
    param->pRequest->inCallback = false;
742,747,889✔
3224
  }
3225
  taosMemoryFree(param);
742,748,194✔
3226

3227
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3228
  //          *(int64_t*)taos, pRequest);
3229

3230
  return pRequest;
742,747,781✔
3231
}
3232

3233
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
834✔
3234
  if (NULL == taos) {
834✔
3235
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3236
    return NULL;
×
3237
  }
3238

3239
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
834✔
3240
  if (param == NULL) {
834✔
3241
    return NULL;
×
3242
  }
3243
  int32_t code = tsem_init(&param->sem, 0, 0);
834✔
3244
  if (TSDB_CODE_SUCCESS != code) {
834✔
3245
    taosMemoryFree(param);
×
3246
    return NULL;
×
3247
  }
3248

3249
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
834✔
3250
  code = tsem_wait(&param->sem);
834✔
3251
  if (TSDB_CODE_SUCCESS != code) {
834✔
3252
    taosMemoryFree(param);
×
3253
    return NULL;
×
3254
  }
3255
  SRequestObj* pRequest = NULL;
834✔
3256
  if (param->pRequest != NULL) {
834✔
3257
    param->pRequest->syncQuery = true;
834✔
3258
    pRequest = param->pRequest;
834✔
3259
  }
3260
  taosMemoryFree(param);
834✔
3261

3262
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3263
  //   *(int64_t*)taos, pRequest);
3264

3265
  return pRequest;
834✔
3266
}
3267

3268
static void fetchCallback(void* pResult, void* param, int32_t code) {
112,217,659✔
3269
  SRequestObj* pRequest = (SRequestObj*)param;
112,217,659✔
3270

3271
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
112,217,659✔
3272

3273
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
112,217,659✔
3274
           tstrerror(code), pRequest->requestId);
3275

3276
  pResultInfo->pData = pResult;
112,217,659✔
3277
  pResultInfo->numOfRows = 0;
112,217,659✔
3278

3279
  if (code != TSDB_CODE_SUCCESS) {
112,217,659✔
3280
    pRequest->code = code;
×
3281
    taosMemoryFreeClear(pResultInfo->pData);
×
3282
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3283
    return;
×
3284
  }
3285

3286
  if (pRequest->code != TSDB_CODE_SUCCESS) {
112,217,659✔
3287
    taosMemoryFreeClear(pResultInfo->pData);
×
3288
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3289
    return;
×
3290
  }
3291

3292
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
113,151,091✔
3293
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
112,217,659✔
3294
  if (pRequest->code != TSDB_CODE_SUCCESS) {
112,217,021✔
3295
    pResultInfo->numOfRows = 0;
75✔
3296
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
75✔
3297
             tstrerror(pRequest->code), pRequest->requestId);
3298
  } else {
3299
    tscDebug(
112,216,886✔
3300
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3301
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3302

3303
    STscObj*            pTscObj = pRequest->pTscObj;
112,217,089✔
3304
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
112,217,584✔
3305
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
112,217,584✔
3306
  }
3307

3308
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
112,217,659✔
3309
}
3310

3311
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
125,902,883✔
3312
  pRequest->body.fetchFp = fp;
125,902,883✔
3313
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
125,903,521✔
3314

3315
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
125,902,877✔
3316

3317
  // this query has no results or error exists, return directly
3318
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
125,902,791✔
UNCOV
3319
    pResultInfo->numOfRows = 0;
×
3320
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3321
    return;
1,779✔
3322
  }
3323

3324
  // all data has returned to App already, no need to try again
3325
  if (pResultInfo->completed) {
125,903,086✔
3326
    // it is a local executed query, no need to do async fetch
3327
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
13,685,862✔
3328
      if (pResultInfo->localResultFetched) {
1,572,766✔
3329
        pResultInfo->numOfRows = 0;
786,383✔
3330
        pResultInfo->current = 0;
786,383✔
3331
      } else {
3332
        pResultInfo->localResultFetched = true;
786,383✔
3333
      }
3334
    } else {
3335
      pResultInfo->numOfRows = 0;
12,113,096✔
3336
    }
3337

3338
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
13,685,862✔
3339
    return;
13,685,862✔
3340
  }
3341

3342
  SSchedulerReq req = {
112,217,224✔
3343
      .syncReq = false,
3344
      .fetchFp = fetchCallback,
3345
      .cbParam = pRequest,
3346
  };
3347

3348
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
112,217,224✔
3349
  if (TSDB_CODE_SUCCESS != code) {
112,217,032✔
3350
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3351
    // pRequest->body.fetchFp(param, pRequest, code);
3352
  }
3353
}
3354

3355
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
743,290,306✔
3356
  pRequest->inCallback = true;
743,290,306✔
3357
  int64_t this = pRequest->self;
743,298,150✔
3358
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
743,279,777✔
3359
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
84,150✔
3360
    code = TSDB_CODE_SUCCESS;
×
3361
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3362
  }
3363

3364
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
743,279,777✔
3365
           pRequest);
3366

3367
  if (pRequest->body.queryFp != NULL) {
743,281,345✔
3368
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
743,293,527✔
3369
  }
3370

3371
  SRequestObj* pReq = acquireRequest(this);
743,296,289✔
3372
  if (pReq != NULL) {
743,299,054✔
3373
    pReq->inCallback = false;
741,661,448✔
3374
    (void)releaseRequest(this);
741,662,408✔
3375
  }
3376
}
743,299,644✔
3377

3378
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
571,256✔
3379
                       SParseSqlRes* pRes) {
3380
#ifndef TD_ENTERPRISE
3381
  return TSDB_CODE_SUCCESS;
3382
#else
3383
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
571,256✔
3384
#endif
3385
}
3386

3387
void updateConnAccessInfo(SConnAccessInfo *pInfo) {
2,804,833✔
3388
  if (pInfo == NULL) {
2,804,833✔
3389
    return;
×
3390
  }
3391
  int64_t ts = taosGetTimestampMs();
2,804,834✔
3392
  if (pInfo->startTime == 0) {
2,804,834✔
3393
    pInfo->startTime = ts;
2,804,835✔
3394
  }
3395
  pInfo->lastAccessTime = ts;
2,804,834✔
3396
}
3397
 
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc