• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4271

10 Jun 2025 09:45AM UTC coverage: 62.985% (+0.002%) from 62.983%
#4271

push

travis-ci

web-flow
Merge pull request #31337 from taosdata/newtest_3.0

fix TD-35057 and TD-35346

158179 of 319671 branches covered (49.48%)

Branch coverage included in aggregate %.

243860 of 318637 relevant lines covered (76.53%)

18624660.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.76
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "decimal.h"
22
#include "scheduler.h"
23
#include "tdatablock.h"
24
#include "tdataformat.h"
25
#include "tdef.h"
26
#include "tglobal.h"
27
#include "tmsgtype.h"
28
#include "tpagedbuf.h"
29
#include "tref.h"
30
#include "tsched.h"
31
#include "tversion.h"
32

33
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
34
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
35

36
void setQueryRequest(int64_t rId) {
1,601,326✔
37
  SRequestObj* pReq = acquireRequest(rId);
1,601,326✔
38
  if (pReq != NULL) {
1,601,337✔
39
    pReq->isQuery = true;
1,601,320✔
40
    (void)releaseRequest(rId);
1,601,320✔
41
  }
42
}
1,601,328✔
43

44
static bool stringLengthCheck(const char* str, size_t maxsize) {
68,082✔
45
  if (str == NULL) {
68,082!
46
    return false;
×
47
  }
48

49
  size_t len = strlen(str);
68,082✔
50
  if (len <= 0 || len > maxsize) {
68,082!
51
    return false;
×
52
  }
53

54
  return true;
68,141✔
55
}
56

57
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
33,291✔
58

59
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
33,283✔
60

61
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
1,547✔
62

63
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
33,198✔
64
  char key[512] = {0};
33,198✔
65
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
33,198✔
66
  return taosStrdup(key);
33,198!
67
}
68

69
bool chkRequestKilled(void* param) {
38,095,707✔
70
  bool         killed = false;
38,095,707✔
71
  SRequestObj* pRequest = acquireRequest((int64_t)param);
38,095,707✔
72
  if (NULL == pRequest || pRequest->killed) {
38,386,921!
73
    killed = true;
×
74
  }
75

76
  (void)releaseRequest((int64_t)param);
38,386,921✔
77

78
  return killed;
38,336,373✔
79
}
80

81
void cleanupAppInfo() {
18,509✔
82
  taosHashCleanup(appInfo.pInstMap);
18,509✔
83
  taosHashCleanup(appInfo.pInstMapByClusterId);
18,509✔
84
  tscInfo("cluster instance map cleaned");
18,509!
85
}
18,509✔
86

87
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
88
                               SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
89

90
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
33,244✔
91
                              uint16_t port, int connType, STscObj** pObj) {
92
  TSC_ERR_RET(taos_init());
33,244!
93
  if (!validateUserName(user)) {
33,297!
94
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
95
  }
96
  int32_t code = 0;
33,300✔
97

98
  char localDb[TSDB_DB_NAME_LEN] = {0};
33,300✔
99
  if (db != NULL && strlen(db) > 0) {
33,300✔
100
    if (!validateDbName(db)) {
1,547!
101
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
102
    }
103

104
    tstrncpy(localDb, db, sizeof(localDb));
1,547✔
105
    (void)strdequote(localDb);
1,547✔
106
  }
107

108
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
33,291✔
109
  if (auth == NULL) {
33,291✔
110
    if (!validatePassword(pass)) {
33,287!
111
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
112
    }
113

114
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
33,315✔
115
  } else {
116
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
4✔
117
  }
118

119
  SCorEpSet epSet = {0};
33,247✔
120
  if (ip) {
33,247✔
121
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
11,380✔
122
  } else {
123
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
21,867!
124
  }
125

126
  if (port) {
33,203✔
127
    epSet.epSet.eps[0].port = port;
1,116✔
128
    epSet.epSet.eps[1].port = port;
1,116✔
129
  }
130

131
  char* key = getClusterKey(user, secretEncrypt, ip, port);
33,203✔
132
  if (NULL == key) {
33,229!
133
    TSC_ERR_RET(terrno);
×
134
  }
135
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
33,229✔
136
          user, db, key);
137
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
88,521✔
138
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
55,193!
139
  }
140
  // for (int32_t i = 0; i < epSet.epSet.numOfEps; i++) {
141
  //   if ((code = taosValidFqdn(tsEnableIpv6, epSet.epSet.eps[i].fqdn)) != 0) {
142
  //     taosMemFree(key);
143
  //     tscError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6,
144
  //              epSet.epSet.eps[i].fqdn, tstrerror(code));
145
  //     TSC_ERR_RET(code);
146
  //   }
147
  // }
148

149
  SAppInstInfo** pInst = NULL;
33,328✔
150
  code = taosThreadMutexLock(&appInfo.mutex);
33,328✔
151
  if (TSDB_CODE_SUCCESS != code) {
33,332!
152
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
153
    TSC_ERR_RET(code);
×
154
  }
155

156
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
33,332✔
157
  SAppInstInfo* p = NULL;
33,332✔
158
  if (pInst == NULL) {
33,332✔
159
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
18,778!
160
    if (NULL == p) {
18,778!
161
      TSC_ERR_JRET(terrno);
×
162
    }
163
    p->mgmtEp = epSet;
18,778✔
164
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
18,778✔
165
    if (TSDB_CODE_SUCCESS != code) {
18,778!
166
      taosMemoryFree(p);
×
167
      TSC_ERR_JRET(code);
×
168
    }
169
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
18,778✔
170
    if (TSDB_CODE_SUCCESS != code) {
18,778!
171
      taosMemoryFree(p);
×
172
      TSC_ERR_JRET(code);
×
173
    }
174
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
18,778✔
175
    if (TSDB_CODE_SUCCESS != code) {
18,778!
176
      destroyAppInst(&p);
×
177
      TSC_ERR_JRET(code);
×
178
    }
179
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
18,778✔
180
    if (TSDB_CODE_SUCCESS != code) {
18,778!
181
      destroyAppInst(&p);
×
182
      TSC_ERR_JRET(code);
×
183
    }
184
    p->instKey = key;
18,778✔
185
    key = NULL;
18,778✔
186
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
18,778!
187

188
    pInst = &p;
18,778✔
189
  } else {
190
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
14,554!
191
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
192
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
193
    }
194
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
195
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
14,554✔
196
  }
197

198
_return:
33,332✔
199

200
  if (TSDB_CODE_SUCCESS != code) {
33,332!
201
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
202
    taosMemoryFreeClear(key);
×
203
    return code;
×
204
  } else {
205
    code = taosThreadMutexUnlock(&appInfo.mutex);
33,332✔
206
    taosMemoryFreeClear(key);
33,326!
207
    if (TSDB_CODE_SUCCESS != code) {
33,329!
208
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
209
      return code;
×
210
    }
211
    return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
33,329✔
212
  }
213
}
214

215
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
216
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
217
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
218
//     return *ppAppInstInfo;
219
//   } else {
220
//     return NULL;
221
//   }
222
// }
223

224
void freeQueryParam(SSyncQueryParam* param) {
1,828✔
225
  if (param == NULL) return;
1,828!
226
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
1,828!
227
    tscError("failed to destroy semaphore in freeQueryParam");
×
228
  }
229
  taosMemoryFree(param);
1,828!
230
}
231

232
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
11,630,699✔
233
                     SRequestObj** pRequest, int64_t reqid) {
234
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
11,630,699✔
235
  if (TSDB_CODE_SUCCESS != code) {
11,643,530!
236
    tscError("failed to malloc sqlObj, %s", sql);
×
237
    return code;
×
238
  }
239

240
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
11,643,530!
241
  if ((*pRequest)->sqlstr == NULL) {
11,633,303!
242
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
243
    destroyRequest(*pRequest);
×
244
    *pRequest = NULL;
×
245
    return terrno;
×
246
  }
247

248
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
11,633,303✔
249
  (*pRequest)->sqlstr[sqlLen] = 0;
11,639,599✔
250
  (*pRequest)->sqlLen = sqlLen;
11,639,599✔
251
  (*pRequest)->validateOnly = validateSql;
11,639,599✔
252
  (*pRequest)->isStmtBind = false;
11,639,599✔
253

254
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
11,639,599✔
255

256
  STscObj* pTscObj = (*pRequest)->pTscObj;
11,639,599✔
257
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
11,639,599✔
258
                             sizeof((*pRequest)->self));
259
  if (err) {
11,631,702!
260
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
261
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
262
    destroyRequest(*pRequest);
×
263
    *pRequest = NULL;
×
264
    return terrno;
×
265
  }
266

267
  (*pRequest)->allocatorRefId = -1;
11,631,702✔
268
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
11,631,702✔
269
    if (TSDB_CODE_SUCCESS !=
1,429,089!
270
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
1,429,040✔
271
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
272
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
273
      destroyRequest(*pRequest);
×
274
      *pRequest = NULL;
×
275
      return terrno;
×
276
    }
277
  }
278

279
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
11,642,146✔
280
  return TSDB_CODE_SUCCESS;
11,637,397✔
281
}
282

283
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
476✔
284
  int32_t code =
285
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
476✔
286
  if (TSDB_CODE_SUCCESS == code) {
476!
287
    pRequest->relation.prevRefId = (*pNewRequest)->self;
476✔
288
    (*pNewRequest)->relation.nextRefId = pRequest->self;
476✔
289
    (*pNewRequest)->relation.userRefId = pRequest->self;
476✔
290
    (*pNewRequest)->isSubReq = true;
476✔
291
  }
292
  return code;
476✔
293
}
294

295
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
20,892✔
296
  STscObj* pTscObj = pRequest->pTscObj;
20,892✔
297

298
  SParseContext cxt = {
20,892✔
299
      .requestId = pRequest->requestId,
20,892✔
300
      .requestRid = pRequest->self,
20,892✔
301
      .acctId = pTscObj->acctId,
20,892✔
302
      .db = pRequest->pDb,
20,892✔
303
      .topicQuery = topicQuery,
304
      .pSql = pRequest->sqlstr,
20,892✔
305
      .sqlLen = pRequest->sqlLen,
20,892✔
306
      .pMsg = pRequest->msgBuf,
20,892✔
307
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
308
      .pTransporter = pTscObj->pAppInfo->pTransporter,
20,892✔
309
      .pStmtCb = pStmtCb,
310
      .pUser = pTscObj->user,
20,892✔
311
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
20,892✔
312
      .enableSysInfo = pTscObj->sysInfo,
20,892✔
313
      .svrVer = pTscObj->sVer,
20,892✔
314
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
20,892✔
315
      .isStmtBind = pRequest->isStmtBind,
20,892✔
316
      .setQueryFp = setQueryRequest,
317
      .timezone = pTscObj->optionInfo.timezone,
20,892✔
318
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
20,892✔
319
  };
320

321
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
20,892✔
322
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
20,922✔
323
  if (code != TSDB_CODE_SUCCESS) {
20,914!
324
    return code;
×
325
  }
326

327
  code = qParseSql(&cxt, pQuery);
20,914✔
328
  if (TSDB_CODE_SUCCESS == code) {
20,890✔
329
    if ((*pQuery)->haveResultSet) {
20,865!
330
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
331
                              (*pQuery)->pResExtSchema, pRequest->isStmtBind);
×
332
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
333
    }
334
  }
335

336
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
20,912!
337
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
20,877✔
338
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
20,877✔
339
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
20,877✔
340
  }
341

342
  taosArrayDestroy(cxt.pTableMetaPos);
20,912✔
343
  taosArrayDestroy(cxt.pTableVgroupPos);
20,903✔
344

345
  return code;
20,917✔
346
}
347

348
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
349
  SRetrieveTableRsp* pRsp = NULL;
×
350
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
351
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
352
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
353
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
354
    code =
355
        setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
×
356
  }
357

358
  return code;
×
359
}
360

361
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
1,148✔
362
  // drop table if exists not_exists_table
363
  if (NULL == pQuery->pCmdMsg) {
1,148!
364
    return TSDB_CODE_SUCCESS;
×
365
  }
366

367
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
1,148✔
368
  pRequest->type = pMsgInfo->msgType;
1,148✔
369
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
1,148✔
370
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
1,148✔
371

372
  STscObj*      pTscObj = pRequest->pTscObj;
1,148✔
373
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
1,148✔
374

375
  // int64_t transporterId = 0;
376
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
1,150!
377
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
1,150!
378
  return TSDB_CODE_SUCCESS;
1,150✔
379
}
380

381
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
22,372,160✔
382

383
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
129,447✔
384
  SRetrieveTableRsp* pRsp = NULL;
129,447✔
385
  if (pRequest->validateOnly) {
129,447✔
386
    doRequestCallback(pRequest, 0);
54✔
387
    return;
54✔
388
  }
389

390
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
129,393✔
391
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
129,393✔
392
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
129,393✔
393
    code =
394
        setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
118,531✔
395
  }
396

397
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
129,392✔
398
  pRequest->code = code;
129,392✔
399

400
  if (pRequest->code != TSDB_CODE_SUCCESS) {
129,392✔
401
    pResultInfo->numOfRows = 0;
9✔
402
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
9!
403
             pRequest->requestId);
404
  } else {
405
    tscDebug(
129,383✔
406
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
407
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
408
  }
409

410
  doRequestCallback(pRequest, code);
129,392✔
411
}
412

413
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
47,955✔
414
  if (pRequest->validateOnly) {
47,955!
415
    doRequestCallback(pRequest, 0);
×
416
    return TSDB_CODE_SUCCESS;
×
417
  }
418

419
  // drop table if exists not_exists_table
420
  if (NULL == pQuery->pCmdMsg) {
47,955✔
421
    doRequestCallback(pRequest, 0);
2✔
422
    return TSDB_CODE_SUCCESS;
2✔
423
  }
424

425
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
47,953✔
426
  pRequest->type = pMsgInfo->msgType;
47,953✔
427
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
47,953✔
428
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
47,953✔
429

430
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
47,953✔
431
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
47,936✔
432

433
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
47,959✔
434
  if (code) {
48,004!
435
    doRequestCallback(pRequest, code);
×
436
  }
437
  return code;
48,005✔
438
}
439

440
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
243,651✔
441
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
243,651✔
442
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
243,651✔
443

444
  if (node1->load < node2->load) {
243,651!
445
    return -1;
×
446
  }
447

448
  return node1->load > node2->load;
243,651✔
449
}
450

451
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
89,017✔
452
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
89,017!
453
  if (pInfo->pQnodeList) {
89,017✔
454
    taosArrayDestroy(pInfo->pQnodeList);
88,374✔
455
    pInfo->pQnodeList = NULL;
88,374✔
456
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
88,374✔
457
  }
458

459
  if (pNodeList) {
89,017!
460
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
89,017✔
461
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
89,017✔
462
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
89,017✔
463
             taosArrayGetSize(pInfo->pQnodeList));
464
  }
465
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
89,017!
466

467
  return TSDB_CODE_SUCCESS;
89,017✔
468
}
469

470
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
11,608,262✔
471
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
11,608,262✔
472
    *required = false;
10,952,017✔
473
    return TSDB_CODE_SUCCESS;
10,952,017✔
474
  }
475

476
  int32_t       code = TSDB_CODE_SUCCESS;
656,245✔
477
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
656,245✔
478
  *required = false;
656,245✔
479

480
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
656,245!
481
  *required = (NULL == pInfo->pQnodeList);
656,245✔
482
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
656,245!
483
  return TSDB_CODE_SUCCESS;
656,245✔
484
}
485

486
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
487
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
488
  int32_t       code = 0;
×
489

490
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
491
  if (pInfo->pQnodeList) {
×
492
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
493
  }
494
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
495
  if (NULL == *pNodeList) {
×
496
    SCatalog* pCatalog = NULL;
×
497
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
498
    if (TSDB_CODE_SUCCESS == code) {
×
499
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
500
      if (NULL == pNodeList) {
×
501
        TSC_ERR_RET(terrno);
×
502
      }
503
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
504
                               .requestId = pRequest->requestId,
×
505
                               .requestObjRefId = pRequest->self,
×
506
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
507
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
508
    }
509

510
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
511
      code = updateQnodeList(pInfo, *pNodeList);
×
512
    }
513
  }
514

515
  return code;
×
516
}
517

518
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
64,979✔
519
  pRequest->type = pQuery->msgType;
64,979✔
520
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
64,979✔
521

522
  SPlanContext cxt = {.queryId = pRequest->requestId,
130,037✔
523
                      .acctId = pRequest->pTscObj->acctId,
64,984✔
524
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
64,984✔
525
                      .pAstRoot = pQuery->pRoot,
65,053✔
526
                      .showRewrite = pQuery->showRewrite,
65,053✔
527
                      .pMsg = pRequest->msgBuf,
65,053✔
528
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
529
                      .pUser = pRequest->pTscObj->user,
65,053✔
530
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
65,053✔
531
                      .sysInfo = pRequest->pTscObj->sysInfo};
65,053✔
532

533
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
65,053✔
534
}
535

536
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
1,369,238✔
537
                         const SExtSchema* pExtSchema, bool isStmt) {
538
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
1,369,238!
539
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
540
    return TSDB_CODE_INVALID_PARA;
×
541
  }
542

543
  pResInfo->numOfCols = numOfCols;
1,369,293✔
544
  if (pResInfo->fields != NULL) {
1,369,293✔
545
    taosMemoryFree(pResInfo->fields);
55!
546
  }
547
  if (pResInfo->userFields != NULL) {
1,369,293✔
548
    taosMemoryFree(pResInfo->userFields);
55!
549
  }
550
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
1,369,293!
551
  if (NULL == pResInfo->fields) return terrno;
1,369,265!
552
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
1,369,265!
553
  if (NULL == pResInfo->userFields) {
1,369,283!
554
    taosMemoryFree(pResInfo->fields);
×
555
    return terrno;
×
556
  }
557
  if (numOfCols != pResInfo->numOfCols) {
1,369,283!
558
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
559
    return TSDB_CODE_FAILED;
×
560
  }
561

562
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
6,765,899✔
563
    pResInfo->fields[i].type = pSchema[i].type;
5,396,675✔
564

565
    pResInfo->userFields[i].type = pSchema[i].type;
5,396,675✔
566
    // userFields must convert to type bytes, no matter isStmt or not
567
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
5,396,675✔
568
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
5,396,621✔
569
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
5,396,557!
570
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
8,810✔
571
    }
572

573
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
5,396,616✔
574
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
5,396,616✔
575
  }
576
  return TSDB_CODE_SUCCESS;
1,369,224✔
577
}
578

579
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
1,032,925✔
580
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
1,032,925!
581
      precision != TSDB_TIME_PRECISION_NANO) {
582
    return;
×
583
  }
584

585
  pResInfo->precision = precision;
1,032,925✔
586
}
587

588
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
733,995✔
589
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
733,995✔
590
  if (NULL == nodeList) {
734,112!
591
    return terrno;
×
592
  }
593
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
734,128✔
594

595
  int32_t dbNum = taosArrayGetSize(pDbVgList);
734,128✔
596
  for (int32_t i = 0; i < dbNum; ++i) {
1,442,302✔
597
    SArray* pVg = taosArrayGetP(pDbVgList, i);
708,232✔
598
    if (NULL == pVg) {
708,232!
599
      continue;
×
600
    }
601
    int32_t vgNum = taosArrayGetSize(pVg);
708,232✔
602
    if (vgNum <= 0) {
708,243✔
603
      continue;
1,085✔
604
    }
605

606
    for (int32_t j = 0; j < vgNum; ++j) {
3,535,276✔
607
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
2,828,179✔
608
      if (NULL == pInfo) {
2,828,046!
609
        taosArrayDestroy(nodeList);
×
610
        return TSDB_CODE_OUT_OF_RANGE;
×
611
      }
612
      SQueryNodeLoad load = {0};
2,828,046✔
613
      load.addr.nodeId = pInfo->vgId;
2,828,046✔
614
      load.addr.epSet = pInfo->epSet;
2,828,046✔
615

616
      if (NULL == taosArrayPush(nodeList, &load)) {
2,828,118!
617
        taosArrayDestroy(nodeList);
×
618
        return terrno;
×
619
      }
620
    }
621
  }
622

623
  int32_t vnodeNum = taosArrayGetSize(nodeList);
734,070✔
624
  if (vnodeNum > 0) {
734,145✔
625
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
706,201✔
626
    goto _return;
706,204✔
627
  }
628

629
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
27,944✔
630
  if (mnodeNum <= 0) {
27,937!
631
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
632
    goto _return;
×
633
  }
634

635
  void* pData = taosArrayGet(pMnodeList, 0);
27,937✔
636
  if (NULL == pData) {
27,936!
637
    taosArrayDestroy(nodeList);
×
638
    return TSDB_CODE_OUT_OF_RANGE;
×
639
  }
640
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
27,936!
641
    taosArrayDestroy(nodeList);
×
642
    return terrno;
×
643
  }
644

645
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
27,941✔
646

647
_return:
23,510✔
648

649
  *pNodeList = nodeList;
734,126✔
650

651
  return TSDB_CODE_SUCCESS;
734,126✔
652
}
653

654
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
295,721✔
655
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
295,721✔
656
  if (NULL == nodeList) {
295,721!
657
    return terrno;
×
658
  }
659

660
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
295,721✔
661
  if (qNodeNum > 0) {
295,721✔
662
    void* pData = taosArrayGet(pQnodeList, 0);
295,405✔
663
    if (NULL == pData) {
295,405!
664
      taosArrayDestroy(nodeList);
×
665
      return TSDB_CODE_OUT_OF_RANGE;
×
666
    }
667
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
295,405!
668
      taosArrayDestroy(nodeList);
×
669
      return terrno;
×
670
    }
671
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
295,405✔
672
    goto _return;
295,405✔
673
  }
674

675
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
316✔
676
  if (mnodeNum <= 0) {
316!
677
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
×
678
    goto _return;
×
679
  }
680

681
  void* pData = taosArrayGet(pMnodeList, 0);
316✔
682
  if (NULL == pData) {
316!
683
    taosArrayDestroy(nodeList);
×
684
    return TSDB_CODE_OUT_OF_RANGE;
×
685
  }
686
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
316!
687
    taosArrayDestroy(nodeList);
×
688
    return terrno;
×
689
  }
690

691
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
316!
692

693
_return:
×
694

695
  *pNodeList = nodeList;
295,721✔
696

697
  return TSDB_CODE_SUCCESS;
295,721✔
698
}
699

700
void freeVgList(void* list) {
44,672✔
701
  SArray* pList = *(SArray**)list;
44,672✔
702
  taosArrayDestroy(pList);
44,672✔
703
}
44,713✔
704

705
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
964,729✔
706
  SArray* pDbVgList = NULL;
964,729✔
707
  SArray* pQnodeList = NULL;
964,729✔
708
  FDelete fp = NULL;
964,729✔
709
  int32_t code = 0;
964,729✔
710

711
  switch (tsQueryPolicy) {
964,729!
712
    case QUERY_POLICY_VNODE:
669,035✔
713
    case QUERY_POLICY_CLIENT: {
714
      if (pResultMeta) {
669,035!
715
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
669,073✔
716
        if (NULL == pDbVgList) {
669,056!
717
          code = terrno;
×
718
          goto _return;
×
719
        }
720
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
669,056✔
721
        for (int32_t i = 0; i < dbNum; ++i) {
1,332,568✔
722
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
663,520✔
723
          if (pRes->code || NULL == pRes->pRes) {
663,509!
724
            continue;
×
725
          }
726

727
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
1,327,088!
728
            code = terrno;
×
729
            goto _return;
×
730
          }
731
        }
732
      } else {
733
        fp = freeVgList;
×
734

735
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
736
        if (dbNum > 0) {
×
737
          SCatalog*     pCtg = NULL;
×
738
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
739
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
740
          if (code != TSDB_CODE_SUCCESS) {
×
741
            goto _return;
×
742
          }
743

744
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
745
          if (NULL == pDbVgList) {
×
746
            code = terrno;
×
747
            goto _return;
×
748
          }
749
          SArray* pVgList = NULL;
×
750
          for (int32_t i = 0; i < dbNum; ++i) {
×
751
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
752
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
753
                                     .requestId = pRequest->requestId,
×
754
                                     .requestObjRefId = pRequest->self,
×
755
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
756

757
            // catalogGetDBVgList will handle dbFName == null.
758
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
759
            if (code) {
×
760
              goto _return;
×
761
            }
762

763
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
764
              code = terrno;
×
765
              goto _return;
×
766
            }
767
          }
768
        }
769
      }
770

771
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
669,048✔
772
      break;
669,090✔
773
    }
774
    case QUERY_POLICY_HYBRID:
295,715✔
775
    case QUERY_POLICY_QNODE: {
776
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
301,217!
777
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
5,502✔
778
        if (pRes->code) {
5,502!
779
          pQnodeList = NULL;
×
780
        } else {
781
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
5,502✔
782
          if (NULL == pQnodeList) {
5,502!
783
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
784
            goto _return;
×
785
          }
786
        }
787
      } else {
788
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
290,218✔
789
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
290,218!
790
        if (pInst->pQnodeList) {
290,219!
791
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
290,219✔
792
          if (NULL == pQnodeList) {
290,219!
793
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
794
            goto _return;
×
795
          }
796
        }
797
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
290,219!
798
      }
799

800
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
295,721✔
801
      break;
295,721✔
802
    }
803
    default:
×
804
      tscError("unknown query policy: %d", tsQueryPolicy);
×
805
      return TSDB_CODE_APP_ERROR;
×
806
  }
807

808
_return:
964,811✔
809
  taosArrayDestroyEx(pDbVgList, fp);
964,811✔
810
  taosArrayDestroy(pQnodeList);
964,829✔
811

812
  return code;
964,836✔
813
}
814

815
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
64,943✔
816
  SArray* pDbVgList = NULL;
64,943✔
817
  SArray* pQnodeList = NULL;
64,943✔
818
  int32_t code = 0;
64,943✔
819

820
  switch (tsQueryPolicy) {
64,943!
821
    case QUERY_POLICY_VNODE:
64,965✔
822
    case QUERY_POLICY_CLIENT: {
823
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
64,965✔
824
      if (dbNum > 0) {
65,031✔
825
        SCatalog*     pCtg = NULL;
44,708✔
826
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
44,708✔
827
        code = catalogGetHandle(pInst->clusterId, &pCtg);
44,708✔
828
        if (code != TSDB_CODE_SUCCESS) {
44,674!
829
          goto _return;
×
830
        }
831

832
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
44,674✔
833
        if (NULL == pDbVgList) {
44,701✔
834
          code = terrno;
35✔
835
          goto _return;
×
836
        }
837
        SArray* pVgList = NULL;
44,666✔
838
        for (int32_t i = 0; i < dbNum; ++i) {
89,362✔
839
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
44,648✔
840
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
44,657✔
841
                                   .requestId = pRequest->requestId,
44,657✔
842
                                   .requestObjRefId = pRequest->self,
44,657✔
843
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
44,657✔
844

845
          // catalogGetDBVgList will handle dbFName == null.
846
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
44,716✔
847
          if (code) {
44,706!
848
            goto _return;
×
849
          }
850

851
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
44,696!
852
            code = terrno;
×
853
            goto _return;
×
854
          }
855
        }
856
      }
857

858
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
65,037✔
859
      break;
65,031✔
860
    }
861
    case QUERY_POLICY_HYBRID:
×
862
    case QUERY_POLICY_QNODE: {
863
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
864

865
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
866
      break;
×
867
    }
868
    default:
×
869
      tscError("unknown query policy: %d", tsQueryPolicy);
×
870
      return TSDB_CODE_APP_ERROR;
×
871
  }
872

873
_return:
65,031✔
874

875
  taosArrayDestroyEx(pDbVgList, freeVgList);
65,031✔
876
  taosArrayDestroy(pQnodeList);
65,027✔
877

878
  return code;
65,032✔
879
}
880

881
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
64,982✔
882
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
64,982✔
883

884
  SExecResult      res = {0};
64,982✔
885
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
64,982✔
886
                           .requestId = pRequest->requestId,
64,982✔
887
                           .requestObjRefId = pRequest->self};
64,982✔
888
  SSchedulerReq    req = {
129,977✔
889
         .syncReq = true,
890
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
64,982✔
891
         .pConn = &conn,
892
         .pNodeList = pNodeList,
893
         .pDag = pDag,
894
         .sql = pRequest->sqlstr,
64,982✔
895
         .startTs = pRequest->metric.start,
64,982✔
896
         .execFp = NULL,
897
         .cbParam = NULL,
898
         .chkKillFp = chkRequestKilled,
899
         .chkKillParam = (void*)pRequest->self,
64,982✔
900
         .pExecRes = &res,
901
         .source = pRequest->source,
64,982✔
902
         .pWorkerCb = getTaskPoolWorkerCb(),
64,982✔
903
  };
904

905
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
64,995✔
906

907
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
65,018✔
908
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
65,015✔
909

910
  if (code != TSDB_CODE_SUCCESS) {
65,015!
911
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
912

913
    pRequest->code = code;
×
914
    terrno = code;
×
915
    return pRequest->code;
8✔
916
  }
917

918
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
65,015!
919
      TDMT_VND_CREATE_TABLE == pRequest->type) {
186✔
920
    pRequest->body.resInfo.numOfRows = res.numOfRows;
64,965✔
921
    if (TDMT_VND_SUBMIT == pRequest->type) {
64,965✔
922
      STscObj*            pTscObj = pRequest->pTscObj;
64,853✔
923
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
64,853✔
924
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
64,853✔
925
    }
926

927
    schedulerFreeJob(&pRequest->body.queryJob, 0);
64,989✔
928
  }
929

930
  pRequest->code = res.code;
65,053✔
931
  terrno = res.code;
65,053✔
932
  return pRequest->code;
65,031✔
933
}
934

935
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
10,205,959✔
936
  SArray*      pArray = NULL;
10,205,959✔
937
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
10,205,959✔
938
  if (NULL == pRsp->aCreateTbRsp) {
10,205,959✔
939
    return TSDB_CODE_SUCCESS;
10,112,123✔
940
  }
941

942
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
93,836✔
943
  for (int32_t i = 0; i < tbNum; ++i) {
204,184✔
944
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
107,627✔
945
    if (pTbRsp->pMeta) {
107,629✔
946
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
86,299!
947
    }
948
  }
949

950
  return TSDB_CODE_SUCCESS;
96,557✔
951
}
952

953
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
767,797✔
954
  int32_t code = 0;
767,797✔
955
  SArray* pArray = NULL;
767,797✔
956
  SArray* pTbArray = (SArray*)res;
767,797✔
957
  int32_t tbNum = taosArrayGetSize(pTbArray);
767,797✔
958
  if (tbNum <= 0) {
767,798!
959
    return TSDB_CODE_SUCCESS;
×
960
  }
961

962
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
767,798✔
963
  if (NULL == pArray) {
767,813✔
964
    return terrno;
3✔
965
  }
966

967
  for (int32_t i = 0; i < tbNum; ++i) {
2,199,879✔
968
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
1,432,067✔
969
    if (NULL == tbInfo) {
1,432,067!
970
      code = terrno;
×
971
      goto _return;
×
972
    }
973
    STbSVersion tbSver = {
1,432,067✔
974
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
1,432,067✔
975
    if (NULL == taosArrayPush(pArray, &tbSver)) {
1,432,069!
976
      code = terrno;
×
977
      goto _return;
×
978
    }
979
  }
980

981
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
767,812✔
982
                           .requestId = pRequest->requestId,
767,812✔
983
                           .requestObjRefId = pRequest->self,
767,812✔
984
                           .mgmtEps = *epset};
985

986
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
767,812✔
987

988
_return:
767,797✔
989

990
  taosArrayDestroy(pArray);
767,797✔
991
  return code;
767,815✔
992
}
993

994
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
7,648✔
995
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
7,648✔
996
}
997

998
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
249,755✔
999
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
249,755✔
1000
}
1001

1002
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
11,302,800✔
1003
  if (NULL == pRequest->body.resInfo.execRes.res) {
11,302,800✔
1004
    return pRequest->code;
228,359✔
1005
  }
1006

1007
  SCatalog*     pCatalog = NULL;
11,074,441✔
1008
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
11,074,441✔
1009

1010
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
11,073,277✔
1011
  if (code) {
11,073,296!
1012
    return code;
×
1013
  }
1014

1015
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
11,073,296✔
1016
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
11,108,190✔
1017

1018
  switch (pRes->msgType) {
11,108,190✔
1019
    case TDMT_VND_ALTER_TABLE:
2,055✔
1020
    case TDMT_MND_ALTER_STB: {
1021
      code = handleAlterTbExecRes(pRes->res, pCatalog);
2,055✔
1022
      break;
2,055✔
1023
    }
1024
    case TDMT_VND_CREATE_TABLE: {
120,315✔
1025
      SArray* pList = (SArray*)pRes->res;
120,315✔
1026
      int32_t num = taosArrayGetSize(pList);
120,315✔
1027
      for (int32_t i = 0; i < num; ++i) {
275,918✔
1028
        void* res = taosArrayGetP(pList, i);
155,564✔
1029
        // handleCreateTbExecRes will handle res == null
1030
        code = handleCreateTbExecRes(res, pCatalog);
155,551✔
1031
      }
1032
      break;
120,354✔
1033
    }
1034
    case TDMT_MND_CREATE_STB: {
938✔
1035
      code = handleCreateTbExecRes(pRes->res, pCatalog);
938✔
1036
      break;
938✔
1037
    }
1038
    case TDMT_VND_SUBMIT: {
10,209,924✔
1039
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
10,209,924✔
1040

1041
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
10,214,384✔
1042
      break;
10,206,897✔
1043
    }
1044
    case TDMT_SCH_QUERY:
767,810✔
1045
    case TDMT_SCH_MERGE_QUERY: {
1046
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
767,810✔
1047
      break;
767,800✔
1048
    }
1049
    default:
7,148✔
1050
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
7,148!
1051
               pRequest->type, pRequest->requestId);
1052
      code = TSDB_CODE_APP_ERROR;
×
1053
  }
1054

1055
  return code;
11,098,044✔
1056
}
1057

1058
static bool incompletaFileParsing(SNode* pStmt) {
11,260,555✔
1059
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
11,260,555✔
1060
}
1061

1062
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
453✔
1063
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
453✔
1064

1065
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
453✔
1066
  if (TSDB_CODE_SUCCESS == code) {
453!
1067
    int64_t analyseStart = taosGetTimestampUs();
453✔
1068
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
453✔
1069
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
453✔
1070
  }
1071

1072
  if (TSDB_CODE_SUCCESS == code) {
453!
1073
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
453✔
1074
  }
1075

1076
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
453✔
1077
  handleQueryAnslyseRes(pWrapper, NULL, code);
453✔
1078
}
453✔
1079

1080
void returnToUser(SRequestObj* pRequest) {
84,604✔
1081
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
84,604!
1082
    // return to client
1083
    doRequestCallback(pRequest, pRequest->code);
84,603✔
1084
    return;
84,603✔
1085
  }
1086

1087
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
1✔
1088
  if (pUserReq) {
1!
1089
    pUserReq->code = pRequest->code;
1✔
1090
    // return to client
1091
    doRequestCallback(pUserReq, pUserReq->code);
1✔
1092
    (void)releaseRequest(pRequest->relation.userRefId);
1✔
1093
    return;
1✔
1094
  } else {
1095
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1096
             pRequest->relation.userRefId, pRequest->requestId);
1097
  }
1098
}
1099

1100
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
453✔
1101
  int64_t     lastTs = 0;
453✔
1102
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
453✔
1103
  int32_t     numOfFields = taos_num_fields(pRes);
453✔
1104

1105
  int32_t code = createDataBlock(pBlock);
453✔
1106
  if (code) {
453!
1107
    return code;
×
1108
  }
1109

1110
  for (int32_t i = 0; i < numOfFields; ++i) {
1,812✔
1111
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
1,359✔
1112
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
1,359✔
1113
    if (TSDB_CODE_SUCCESS != code) {
1,359!
1114
      blockDataDestroy(*pBlock);
×
1115
      return code;
×
1116
    }
1117
  }
1118

1119
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
453✔
1120
  if (TSDB_CODE_SUCCESS != code) {
453!
1121
    blockDataDestroy(*pBlock);
×
1122
    return code;
×
1123
  }
1124

1125
  for (int32_t i = 0; i < numOfRows; ++i) {
1,360✔
1126
    TAOS_ROW pRow = taos_fetch_row(pRes);
907✔
1127
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
907!
1128
      tscError("invalid data from vnode");
×
1129
      blockDataDestroy(*pBlock);
×
1130
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1131
    }
1132
    int64_t ts = *(int64_t*)pRow[0];
907✔
1133
    if (lastTs < ts) {
907✔
1134
      lastTs = ts;
517✔
1135
    }
1136

1137
    for (int32_t j = 0; j < numOfFields; ++j) {
3,628✔
1138
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
2,721✔
1139
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
2,721✔
1140
      if (TSDB_CODE_SUCCESS != code) {
2,721!
1141
        blockDataDestroy(*pBlock);
×
1142
        return code;
×
1143
      }
1144
    }
1145

1146
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
907!
1147
            *(int64_t*)pRow[2]);
1148
  }
1149

1150
  (*pBlock)->info.window.ekey = lastTs;
453✔
1151
  (*pBlock)->info.rows = numOfRows;
453✔
1152

1153
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
453!
1154
  return TSDB_CODE_SUCCESS;
453✔
1155
}
1156

1157
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
454✔
1158
  SRequestObj* pRequest = (SRequestObj*)res;
454✔
1159
  if (pRequest->code) {
454✔
1160
    returnToUser(pRequest);
1✔
1161
    return;
1✔
1162
  }
1163

1164
  SSDataBlock* pBlock = NULL;
453✔
1165
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
453✔
1166
  if (TSDB_CODE_SUCCESS != pRequest->code) {
453!
1167
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1168
             tstrerror(pRequest->code));
1169
    returnToUser(pRequest);
×
1170
    return;
×
1171
  }
1172

1173
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
453✔
1174
  if (pNextReq) {
453!
1175
    continuePostSubQuery(pNextReq, pBlock);
453✔
1176
    (void)releaseRequest(pRequest->relation.nextRefId);
453✔
1177
  } else {
1178
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1179
             pRequest->relation.nextRefId, pRequest->requestId);
1180
  }
1181

1182
  blockDataDestroy(pBlock);
453✔
1183
}
1184

1185
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
454✔
1186
  SRequestObj* pRequest = pWrapper->pRequest;
454✔
1187
  if (TD_RES_QUERY(pRequest)) {
454!
1188
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
454✔
1189
    return;
454✔
1190
  }
1191

1192
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1193
  if (pNextReq) {
×
1194
    continuePostSubQuery(pNextReq, NULL);
×
1195
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1196
  } else {
1197
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1198
             pRequest->relation.nextRefId, pRequest->requestId);
1199
  }
1200
}
1201

1202
// todo refacto the error code  mgmt
1203
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
11,192,846✔
1204
  SSqlCallbackWrapper* pWrapper = param;
11,192,846✔
1205
  SRequestObj*         pRequest = pWrapper->pRequest;
11,192,846✔
1206
  STscObj*             pTscObj = pRequest->pTscObj;
11,192,846✔
1207

1208
  pRequest->code = code;
11,192,846✔
1209
  if (pResult) {
11,192,846!
1210
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
11,194,150✔
1211
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
11,208,314✔
1212
  }
1213

1214
  int32_t type = pRequest->type;
11,207,010✔
1215
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
11,207,010✔
1216
    if (pResult) {
10,274,255!
1217
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
10,282,456✔
1218

1219
      // record the insert rows
1220
      if (TDMT_VND_SUBMIT == type) {
10,282,456✔
1221
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
10,102,693✔
1222
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
10,102,693✔
1223
      }
1224
    }
1225
    schedulerFreeJob(&pRequest->body.queryJob, 0);
10,323,062✔
1226
  }
1227

1228
  taosMemoryFree(pResult);
11,253,352!
1229
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
11,262,028✔
1230
           pRequest->requestId);
1231

1232
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
11,262,437!
1233
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
399✔
1234
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1235
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
399!
1236
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1237
    }
1238
    restartAsyncQuery(pRequest, code);
399✔
1239
    return;
399✔
1240
  }
1241

1242
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
11,262,038!
1243
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
11,262,038!
1244
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
25,761!
1245
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1246
    }
1247
  }
1248

1249
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
11,242,033✔
1250
  int32_t code1 = handleQueryExecRsp(pRequest);
11,242,033✔
1251
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
11,258,789!
1252
    pRequest->code = code1;
×
1253
  }
1254

1255
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
22,519,759!
1256
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
11,259,836✔
1257
    continueInsertFromCsv(pWrapper, pRequest);
×
1258
    return;
22✔
1259
  }
1260

1261
  if (pRequest->relation.nextRefId) {
11,263,469✔
1262
    handlePostSubQuery(pWrapper);
454✔
1263
  } else {
1264
    destorySqlCallbackWrapper(pWrapper);
11,263,015✔
1265
    pRequest->pWrapper = NULL;
11,267,231✔
1266

1267
    // return to client
1268
    doRequestCallback(pRequest, code);
11,267,231✔
1269
  }
1270
}
1271

1272
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
66,137✔
1273
  int32_t code = 0;
66,137✔
1274
  int32_t subplanNum = 0;
66,137✔
1275

1276
  if (pQuery->pRoot) {
66,137✔
1277
    pRequest->stmtType = pQuery->pRoot->type;
65,013✔
1278
  }
1279

1280
  if (pQuery->pRoot && !pRequest->inRetry) {
66,137!
1281
    STscObj*            pTscObj = pRequest->pTscObj;
65,022✔
1282
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
65,022✔
1283
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
65,022✔
1284
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
65,002✔
1285
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
20!
1286
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
22✔
1287
    }
1288
  }
1289

1290
  pRequest->body.execMode = pQuery->execMode;
66,195✔
1291
  switch (pQuery->execMode) {
66,195!
1292
    case QUERY_EXEC_MODE_LOCAL:
×
1293
      if (!pRequest->validateOnly) {
×
1294
        if (NULL == pQuery->pRoot) {
×
1295
          terrno = TSDB_CODE_INVALID_PARA;
×
1296
          code = terrno;
×
1297
        } else {
1298
          code = execLocalCmd(pRequest, pQuery);
×
1299
        }
1300
      }
1301
      break;
×
1302
    case QUERY_EXEC_MODE_RPC:
1,148✔
1303
      if (!pRequest->validateOnly) {
1,148!
1304
        code = execDdlQuery(pRequest, pQuery);
1,148✔
1305
      }
1306
      break;
1,150✔
1307
    case QUERY_EXEC_MODE_SCHEDULE: {
65,047✔
1308
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
65,047✔
1309
      if (NULL == pMnodeList) {
65,003!
1310
        code = terrno;
×
1311
        break;
×
1312
      }
1313
      SQueryPlan* pDag = NULL;
65,003✔
1314
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
65,003✔
1315
      if (TSDB_CODE_SUCCESS == code) {
65,004✔
1316
        pRequest->body.subplanNum = pDag->numOfSubplans;
65,003✔
1317
        if (!pRequest->validateOnly) {
65,003✔
1318
          SArray* pNodeList = NULL;
64,978✔
1319
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
64,978✔
1320
          if (TSDB_CODE_SUCCESS == code) {
65,034!
1321
            code = scheduleQuery(pRequest, pDag, pNodeList);
65,040✔
1322
          }
1323
          taosArrayDestroy(pNodeList);
65,004✔
1324
        }
1325
      }
1326
      taosArrayDestroy(pMnodeList);
65,087✔
1327
      break;
65,064✔
1328
    }
1329
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1330
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1331
      break;
×
1332
    default:
×
1333
      break;
×
1334
  }
1335

1336
  if (!keepQuery) {
66,214!
1337
    qDestroyQuery(pQuery);
×
1338
  }
1339

1340
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
66,214!
1341
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
215!
1342
    if (TSDB_CODE_SUCCESS != ret) {
215!
1343
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1344
               pRequest->requestId);
1345
    }
1346
  }
1347

1348
  if (TSDB_CODE_SUCCESS == code) {
66,214✔
1349
    code = handleQueryExecRsp(pRequest);
66,200✔
1350
  }
1351

1352
  if (TSDB_CODE_SUCCESS != code) {
66,220✔
1353
    pRequest->code = code;
137✔
1354
  }
1355

1356
  if (res) {
66,220!
1357
    *res = pRequest->body.resInfo.execRes.res;
×
1358
    pRequest->body.resInfo.execRes.res = NULL;
×
1359
  }
1360
}
66,220✔
1361

1362
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
11,251,890✔
1363
                                 SSqlCallbackWrapper* pWrapper) {
1364
  int32_t code = TSDB_CODE_SUCCESS;
11,251,890✔
1365
  pRequest->type = pQuery->msgType;
11,251,890✔
1366
  SArray*     pMnodeList = NULL;
11,251,890✔
1367
  SQueryPlan* pDag = NULL;
11,251,890✔
1368
  int64_t     st = taosGetTimestampUs();
11,249,459✔
1369

1370
  if (!pRequest->parseOnly) {
11,249,459!
1371
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
11,253,647✔
1372
    if (NULL == pMnodeList) {
11,242,918!
1373
      code = terrno;
×
1374
    }
1375
    SPlanContext cxt = {.queryId = pRequest->requestId,
22,512,433✔
1376
                        .acctId = pRequest->pTscObj->acctId,
11,242,918✔
1377
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
11,242,918✔
1378
                        .pAstRoot = pQuery->pRoot,
11,269,515✔
1379
                        .showRewrite = pQuery->showRewrite,
11,269,515✔
1380
                        .isView = pWrapper->pParseCtx->isView,
11,269,515✔
1381
                        .isAudit = pWrapper->pParseCtx->isAudit,
11,269,515✔
1382
                        .pMsg = pRequest->msgBuf,
11,269,515✔
1383
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1384
                        .pUser = pRequest->pTscObj->user,
11,269,515✔
1385
                        .sysInfo = pRequest->pTscObj->sysInfo,
11,269,515✔
1386
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
11,269,515✔
1387
                        .allocatorId = pRequest->allocatorRefId};
11,269,515✔
1388
    if (TSDB_CODE_SUCCESS == code) {
11,269,515!
1389
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
11,270,192✔
1390
    }
1391
    if (code) {
11,212,534✔
1392
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
1,230!
1393
               pRequest->requestId);
1394
    } else {
1395
      pRequest->body.subplanNum = pDag->numOfSubplans;
11,211,304✔
1396
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
11,211,304✔
1397
    }
1398
  }
1399

1400
  pRequest->metric.execStart = taosGetTimestampUs();
11,202,932✔
1401
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
11,202,932✔
1402

1403
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
22,433,056!
1404
    SArray* pNodeList = NULL;
11,223,942✔
1405
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
11,223,942✔
1406
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
964,791✔
1407
    }
1408

1409
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
11,223,983✔
1410
                             .requestId = pRequest->requestId,
11,204,346✔
1411
                             .requestObjRefId = pRequest->self};
11,204,346✔
1412
    SSchedulerReq    req = {
22,396,258✔
1413
           .syncReq = false,
1414
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
11,204,346✔
1415
           .pConn = &conn,
1416
           .pNodeList = pNodeList,
1417
           .pDag = pDag,
1418
           .allocatorRefId = pRequest->allocatorRefId,
11,204,346✔
1419
           .sql = pRequest->sqlstr,
11,204,346✔
1420
           .startTs = pRequest->metric.start,
11,204,346✔
1421
           .execFp = schedulerExecCb,
1422
           .cbParam = pWrapper,
1423
           .chkKillFp = chkRequestKilled,
1424
           .chkKillParam = (void*)pRequest->self,
11,204,346✔
1425
           .pExecRes = NULL,
1426
           .source = pRequest->source,
11,204,346✔
1427
           .pWorkerCb = getTaskPoolWorkerCb(),
11,204,346✔
1428
    };
1429
    if (TSDB_CODE_SUCCESS == code) {
11,191,912!
1430
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
11,198,494✔
1431
    }
1432

1433
    taosArrayDestroy(pNodeList);
11,228,895✔
1434
  } else {
1435
    qDestroyQueryPlan(pDag);
×
1436
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
1,660✔
1437
             pRequest->requestId);
1438
    destorySqlCallbackWrapper(pWrapper);
1,660✔
1439
    pRequest->pWrapper = NULL;
1,660✔
1440
    if (TSDB_CODE_SUCCESS != code) {
1,660✔
1441
      pRequest->code = terrno;
1,230✔
1442
    }
1443

1444
    doRequestCallback(pRequest, code);
1,660✔
1445
  }
1446

1447
  // todo not to be released here
1448
  taosArrayDestroy(pMnodeList);
11,231,784✔
1449

1450
  return code;
11,234,849✔
1451
}
1452

1453
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
11,377,340✔
1454
  int32_t code = 0;
11,377,340✔
1455

1456
  if (pRequest->parseOnly) {
11,377,340✔
1457
    doRequestCallback(pRequest, 0);
1,224✔
1458
    return;
1,224✔
1459
  }
1460

1461
  pRequest->body.execMode = pQuery->execMode;
11,376,116✔
1462
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
11,376,116✔
1463
    destorySqlCallbackWrapper(pWrapper);
180,304✔
1464
    pRequest->pWrapper = NULL;
180,262✔
1465
  }
1466

1467
  if (pQuery->pRoot && !pRequest->inRetry) {
11,376,074!
1468
    STscObj*            pTscObj = pRequest->pTscObj;
11,413,375✔
1469
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
11,413,375✔
1470
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
11,413,375✔
1471
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
10,285,187✔
1472
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
10,132,951✔
1473
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
1,280,424✔
1474
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
864,054✔
1475
    }
1476
  }
1477

1478
  switch (pQuery->execMode) {
11,449,366!
1479
    case QUERY_EXEC_MODE_LOCAL:
129,447✔
1480
      asyncExecLocalCmd(pRequest, pQuery);
129,447✔
1481
      break;
129,447✔
1482
    case QUERY_EXEC_MODE_RPC:
47,967✔
1483
      code = asyncExecDdlQuery(pRequest, pQuery);
47,967✔
1484
      break;
48,007✔
1485
    case QUERY_EXEC_MODE_SCHEDULE: {
11,269,065✔
1486
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
11,269,065✔
1487
      break;
11,224,020✔
1488
    }
1489
    case QUERY_EXEC_MODE_EMPTY_RESULT:
2,887✔
1490
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
2,887✔
1491
      doRequestCallback(pRequest, 0);
2,887✔
1492
      break;
2,887✔
1493
    default:
×
1494
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1495
      doRequestCallback(pRequest, -1);
×
1496
      break;
×
1497
  }
1498
}
1499

1500
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
45✔
1501
  SCatalog* pCatalog = NULL;
45✔
1502
  int32_t   code = 0;
45✔
1503
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
45✔
1504
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
45✔
1505

1506
  if (dbNum <= 0 && tblNum <= 0) {
45!
1507
    return TSDB_CODE_APP_ERROR;
45✔
1508
  }
1509

1510
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
1511
  if (code != TSDB_CODE_SUCCESS) {
×
1512
    return code;
×
1513
  }
1514

1515
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1516
                           .requestId = pRequest->requestId,
×
1517
                           .requestObjRefId = pRequest->self,
×
1518
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1519

1520
  for (int32_t i = 0; i < dbNum; ++i) {
×
1521
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1522

1523
    // catalogRefreshDBVgInfo will handle dbFName == null.
1524
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
1525
    if (code != TSDB_CODE_SUCCESS) {
×
1526
      return code;
×
1527
    }
1528
  }
1529

1530
  for (int32_t i = 0; i < tblNum; ++i) {
×
1531
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1532

1533
    // catalogRefreshTableMeta will handle tableName == null.
1534
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
1535
    if (code != TSDB_CODE_SUCCESS) {
×
1536
      return code;
×
1537
    }
1538
  }
1539

1540
  return code;
×
1541
}
1542

1543
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
29,300✔
1544
  SCatalog* pCatalog = NULL;
29,300✔
1545
  int32_t   tbNum = taosArrayGetSize(tbList);
29,300✔
1546
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
29,301✔
1547
  if (code != TSDB_CODE_SUCCESS) {
29,300!
1548
    return code;
×
1549
  }
1550

1551
  if (isView) {
29,300✔
1552
    for (int32_t i = 0; i < tbNum; ++i) {
1,632✔
1553
      SName* pViewName = taosArrayGet(tbList, i);
816✔
1554
      char   dbFName[TSDB_DB_FNAME_LEN];
1555
      if (NULL == pViewName) {
816!
1556
        continue;
×
1557
      }
1558
      (void)tNameGetFullDbName(pViewName, dbFName);
816✔
1559
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
816!
1560
    }
1561
  } else {
1562
    for (int32_t i = 0; i < tbNum; ++i) {
46,920✔
1563
      SName* pTbName = taosArrayGet(tbList, i);
18,435✔
1564
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
18,436!
1565
    }
1566
  }
1567

1568
  return TSDB_CODE_SUCCESS;
29,301✔
1569
}
1570

1571
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
33,084✔
1572
  pEpSet->version = 0;
33,084✔
1573

1574
  // init mnode ip set
1575
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
33,084✔
1576
  mgmtEpSet->numOfEps = 0;
33,084✔
1577
  mgmtEpSet->inUse = 0;
33,084✔
1578

1579
  if (firstEp && firstEp[0] != 0) {
33,084!
1580
    if (strlen(firstEp) >= TSDB_EP_LEN) {
33,294!
1581
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1582
      return -1;
×
1583
    }
1584

1585
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
33,294✔
1586
    if (code != TSDB_CODE_SUCCESS) {
33,240!
1587
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1588
      return terrno;
×
1589
    }
1590
    // uint32_t addr = 0;
1591
    SIpAddr addr = {0};
33,240✔
1592
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
33,240✔
1593
    if (code) {
33,211!
1594
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1595
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1596
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
4✔
1597
    } else {
1598
      mgmtEpSet->numOfEps++;
33,230✔
1599
    }
1600
  }
1601

1602
  if (secondEp && secondEp[0] != 0) {
33,024✔
1603
    if (strlen(secondEp) >= TSDB_EP_LEN) {
21,863!
1604
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1605
      return terrno;
×
1606
    }
1607

1608
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
21,863✔
1609
    if (code != TSDB_CODE_SUCCESS) {
21,864!
1610
      return code;
×
1611
    }
1612
    SIpAddr addr = {0};
21,864✔
1613
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
21,864✔
1614
    if (code) {
21,866!
1615
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1616
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1617
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1618
    } else {
1619
      mgmtEpSet->numOfEps++;
21,866✔
1620
    }
1621
  }
1622

1623
  if (mgmtEpSet->numOfEps == 0) {
33,027✔
1624
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1625
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1626
  }
1627

1628
  return 0;
33,023✔
1629
}
1630

1631
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
33,328✔
1632
                        SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1633
  *pTscObj = NULL;
33,328✔
1634
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
33,328✔
1635
  if (TSDB_CODE_SUCCESS != code) {
33,330!
1636
    return code;
×
1637
  }
1638

1639
  SRequestObj* pRequest = NULL;
33,330✔
1640
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
33,330✔
1641
  if (TSDB_CODE_SUCCESS != code) {
33,313!
1642
    destroyTscObj(*pTscObj);
×
1643
    return code;
×
1644
  }
1645

1646
  pRequest->sqlstr = taosStrdup("taos_connect");
33,313!
1647
  if (pRequest->sqlstr) {
33,316!
1648
    pRequest->sqlLen = strlen(pRequest->sqlstr);
33,316✔
1649
  } else {
1650
    return terrno;
×
1651
  }
1652

1653
  SMsgSendInfo* body = NULL;
33,316✔
1654
  code = buildConnectMsg(pRequest, &body);
33,316✔
1655
  if (TSDB_CODE_SUCCESS != code) {
33,267!
1656
    destroyTscObj(*pTscObj);
×
1657
    return code;
×
1658
  }
1659

1660
  // int64_t transporterId = 0;
1661
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body);
33,267✔
1662
  if (TSDB_CODE_SUCCESS != code) {
33,323!
1663
    destroyTscObj(*pTscObj);
×
1664
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1665
    return code;
×
1666
  }
1667
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
33,323!
1668
    destroyTscObj(*pTscObj);
×
1669
    tscError("failed to wait sem, code:%s", terrstr());
×
1670
    return terrno;
×
1671
  }
1672
  if (pRequest->code != TSDB_CODE_SUCCESS) {
33,329✔
1673
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
64!
1674
    tscError("failed to connect to server, reason: %s", errorMsg);
64!
1675

1676
    terrno = pRequest->code;
64✔
1677
    destroyRequest(pRequest);
64✔
1678
    taos_close_internal(*pTscObj);
64✔
1679
    *pTscObj = NULL;
64✔
1680
    return terrno;
64✔
1681
  } else {
1682
    tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
33,265✔
1683
            (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1684
    destroyRequest(pRequest);
33,270✔
1685
  }
1686
  return code;
33,259✔
1687
}
1688

1689
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
33,296✔
1690
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
33,296!
1691
  if (*pMsgSendInfo == NULL) {
33,324!
1692
    return terrno;
×
1693
  }
1694

1695
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
33,324✔
1696

1697
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
33,324✔
1698
  (*pMsgSendInfo)->requestId = pRequest->requestId;
33,324✔
1699
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
33,324✔
1700
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
33,312!
1701
  if (NULL == (*pMsgSendInfo)->param) {
33,306!
1702
    taosMemoryFree(*pMsgSendInfo);
×
1703
    return terrno;
×
1704
  }
1705

1706
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
33,306✔
1707

1708
  SConnectReq connectReq = {0};
33,306✔
1709
  STscObj*    pObj = pRequest->pTscObj;
33,306✔
1710

1711
  char* db = getDbOfConnection(pObj);
33,306✔
1712
  if (db != NULL) {
33,330✔
1713
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1,546✔
1714
  } else if (terrno) {
31,784!
1715
    taosMemoryFree(*pMsgSendInfo);
×
1716
    return terrno;
×
1717
  }
1718
  taosMemoryFreeClear(db);
33,320!
1719

1720
  connectReq.connType = pObj->connType;
33,320✔
1721
  connectReq.pid = appInfo.pid;
33,320✔
1722
  connectReq.startTime = appInfo.startTime;
33,320✔
1723

1724
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
33,320✔
1725
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
33,320✔
1726
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
33,320✔
1727
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
33,320✔
1728

1729
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
33,320✔
1730
  void*   pReq = taosMemoryMalloc(contLen);
33,277!
1731
  if (NULL == pReq) {
33,307!
1732
    taosMemoryFree(*pMsgSendInfo);
×
1733
    return terrno;
×
1734
  }
1735

1736
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
33,307✔
1737
    taosMemoryFree(*pMsgSendInfo);
20!
1738
    taosMemoryFree(pReq);
×
1739
    return terrno;
×
1740
  }
1741

1742
  (*pMsgSendInfo)->msgInfo.len = contLen;
33,269✔
1743
  (*pMsgSendInfo)->msgInfo.pData = pReq;
33,269✔
1744
  return TSDB_CODE_SUCCESS;
33,269✔
1745
}
1746

1747
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
15,329,511✔
1748
  if (NULL == pEpSet) {
15,329,511✔
1749
    return;
13,891,771✔
1750
  }
1751

1752
  switch (pSendInfo->target.type) {
1,437,740✔
1753
    case TARGET_TYPE_MNODE:
22✔
1754
      if (NULL == pTscObj) {
22!
1755
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1756
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1757
        return;
×
1758
      }
1759

1760
      SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
22✔
1761
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
22✔
1762
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
22✔
1763
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
22✔
1764
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1765
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
22✔
1766
      break;
22✔
1767
    case TARGET_TYPE_VNODE: {
814,135✔
1768
      if (NULL == pTscObj) {
814,135✔
1769
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
4!
1770
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1771
        return;
4✔
1772
      }
1773

1774
      SCatalog* pCatalog = NULL;
814,131✔
1775
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
814,131✔
1776
      if (code != TSDB_CODE_SUCCESS) {
814,119!
1777
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1778
                 tstrerror(code));
1779
        return;
×
1780
      }
1781

1782
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
814,119✔
1783
      if (code != TSDB_CODE_SUCCESS) {
814,136!
1784
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1785
                 tstrerror(code));
1786
        return;
×
1787
      }
1788
      taosMemoryFreeClear(pSendInfo->target.dbFName);
814,138!
1789
      break;
814,135✔
1790
    }
1791
    default:
623,583✔
1792
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
623,583!
1793
      break;
625,171✔
1794
  }
1795
}
1796

1797
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
15,339,531✔
1798
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
15,339,531✔
1799
  if (pMsg->info.ahandle == NULL) {
15,339,531✔
1800
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
393!
1801
    rpcFreeCont(pMsg->pCont);
393✔
1802
    taosMemoryFree(pEpSet);
393!
1803
    return TSDB_CODE_TSC_INTERNAL_ERROR;
393✔
1804
  }
1805

1806
  STscObj* pTscObj = NULL;
15,339,138✔
1807

1808
  STraceId* trace = &pMsg->info.traceId;
15,339,138✔
1809
  char      tbuf[40] = {0};
15,339,138✔
1810
  TRACE_TO_STR(trace, tbuf);
15,339,138!
1811

1812
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
15,339,038!
1813
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1814

1815
  if (pSendInfo->requestObjRefId != 0) {
15,339,236✔
1816
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
13,764,617✔
1817
    if (pRequest) {
13,759,156✔
1818
      if (pRequest->self != pSendInfo->requestObjRefId) {
13,755,314!
1819
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1820
                 pSendInfo->requestObjRefId);
1821

1822
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1823
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1824
        }
1825
        rpcFreeCont(pMsg->pCont);
×
1826
        taosMemoryFree(pEpSet);
×
1827
        destroySendMsgInfo(pSendInfo);
×
1828
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1829
      }
1830
      pTscObj = pRequest->pTscObj;
13,755,314✔
1831
    }
1832
  }
1833

1834
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
15,333,775✔
1835

1836
  SDataBuf buf = {.msgType = pMsg->msgType,
15,329,894✔
1837
                  .len = pMsg->contLen,
15,329,894✔
1838
                  .pData = NULL,
1839
                  .handle = pMsg->info.handle,
15,329,894✔
1840
                  .handleRefId = pMsg->info.refId,
15,329,894✔
1841
                  .pEpSet = pEpSet};
1842

1843
  if (pMsg->contLen > 0) {
15,329,894✔
1844
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
15,202,084!
1845
    if (buf.pData == NULL) {
15,204,850!
1846
      pMsg->code = terrno;
×
1847
    } else {
1848
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
15,204,850✔
1849
    }
1850
  }
1851

1852
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
15,332,660✔
1853

1854
  if (pTscObj) {
15,320,978✔
1855
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
13,743,571✔
1856
    if (TSDB_CODE_SUCCESS != code) {
13,755,522!
1857
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1858
      terrno = code;
×
1859
      pMsg->code = code;
×
1860
    }
1861
  }
1862

1863
  rpcFreeCont(pMsg->pCont);
15,332,929✔
1864
  destroySendMsgInfo(pSendInfo);
15,338,255✔
1865
  return TSDB_CODE_SUCCESS;
15,338,708✔
1866
}
1867

1868
int32_t doProcessMsgFromServer(void* param) {
15,340,947✔
1869
  AsyncArg* arg = (AsyncArg*)param;
15,340,947✔
1870
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
15,340,947✔
1871
  taosMemoryFree(arg);
15,336,943!
1872
  return code;
15,340,385✔
1873
}
1874

1875
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
15,314,497✔
1876
  int32_t code = 0;
15,314,497✔
1877
  SEpSet* tEpSet = NULL;
15,314,497✔
1878

1879
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
15,314,497✔
1880

1881
  if (pEpSet != NULL) {
15,311,912✔
1882
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
1,439,328!
1883
    if (NULL == tEpSet) {
1,439,331!
1884
      code = terrno;
×
1885
      pMsg->code = terrno;
×
1886
      goto _exit;
×
1887
    }
1888
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
1,439,331✔
1889
  }
1890

1891
  // pMsg is response msg
1892
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
15,311,915✔
1893
    // restore origin code
1894
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
33,319!
1895
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1896
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
33,319!
1897
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1898
    }
1899
  } else {
1900
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1901
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
15,278,596!
1902
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1903
    }
1904
  }
1905

1906
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
15,311,915!
1907
  if (NULL == arg) {
15,318,970!
1908
    code = terrno;
×
1909
    pMsg->code = code;
×
1910
    goto _exit;
×
1911
  }
1912

1913
  arg->msg = *pMsg;
15,318,970✔
1914
  arg->pEpset = tEpSet;
15,318,970✔
1915

1916
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
15,318,970✔
1917
    pMsg->code = code;
1,313✔
1918
    taosMemoryFree(arg);
1,313!
1919
    goto _exit;
×
1920
  }
1921
  return;
15,333,135✔
1922

1923
_exit:
×
1924
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
1925
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
1926
  if (code != 0) {
×
1927
    tscError("failed to sched msg to tsc, tsc ready quit");
×
1928
  }
1929
}
1930

1931
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
4✔
1932
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
4!
1933
  if (user == NULL) {
4!
1934
    user = TSDB_DEFAULT_USER;
×
1935
  }
1936

1937
  if (auth == NULL) {
4!
1938
    tscError("No auth info is given, failed to connect to server");
×
1939
    return NULL;
×
1940
  }
1941

1942
  STscObj* pObj = NULL;
4✔
1943
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
4✔
1944
  if (TSDB_CODE_SUCCESS == code) {
4✔
1945
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1!
1946
    if (NULL == rid) {
1!
1947
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
1948
    }
1949
    *rid = pObj->id;
1✔
1950
    return (TAOS*)rid;
1✔
1951
  }
1952

1953
  return NULL;
3✔
1954
}
1955

1956
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
1957
//                      const char* db, int dbLen, uint16_t port) {
1958
//   char ipStr[TSDB_EP_LEN] = {0};
1959
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
1960
//   char userStr[TSDB_USER_LEN] = {0};
1961
//   char passStr[TSDB_PASSWORD_LEN] = {0};
1962
//
1963
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
1964
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
1965
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
1966
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
1967
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
1968
// }
1969

1970
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
35,576,537✔
1971
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
176,469,338✔
1972
    SResultColumn* pCol = &pResultInfo->pCol[i];
140,954,096✔
1973

1974
    int32_t type = pResultInfo->fields[i].type;
140,954,096✔
1975
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
140,954,096✔
1976

1977
    if (IS_VAR_DATA_TYPE(type)) {
140,892,801✔
1978
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
27,356,857!
1979
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
25,835,228✔
1980

1981
        pResultInfo->length[i] = varDataLen(pStart);
25,835,228✔
1982
        pResultInfo->row[i] = varDataVal(pStart);
25,835,228✔
1983
      } else {
1984
        pResultInfo->row[i] = NULL;
1,521,629✔
1985
        pResultInfo->length[i] = 0;
1,521,629✔
1986
      }
1987
    } else {
1988
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
113,535,944✔
1989
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
108,088,984✔
1990
        pResultInfo->length[i] = schemaBytes;
108,088,984✔
1991
      } else {
1992
        pResultInfo->row[i] = NULL;
5,446,960✔
1993
        pResultInfo->length[i] = 0;
5,446,960✔
1994
      }
1995
    }
1996
  }
1997
}
35,515,242✔
1998

1999
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2000
  if (pRequest == NULL) {
×
2001
    return NULL;
×
2002
  }
2003

2004
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2005
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2006
    // All data has returned to App already, no need to try again
2007
    if (pResultInfo->completed) {
×
2008
      pResultInfo->numOfRows = 0;
×
2009
      return NULL;
×
2010
    }
2011

2012
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2013
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2014

2015
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2016
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2017
      pResultInfo->numOfRows = 0;
×
2018
      return NULL;
×
2019
    }
2020

2021
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2022
                                           convertUcs4, pRequest->isStmtBind);
×
2023
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2024
      pResultInfo->numOfRows = 0;
×
2025
      return NULL;
×
2026
    }
2027

2028
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2029
             ", complete:%d, QID:0x%" PRIx64,
2030
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2031

2032
    STscObj*            pTscObj = pRequest->pTscObj;
×
2033
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2034
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2035

2036
    if (pResultInfo->numOfRows == 0) {
×
2037
      return NULL;
×
2038
    }
2039
  }
2040

2041
  if (setupOneRowPtr) {
×
2042
    doSetOneRowPtr(pResultInfo);
×
2043
    pResultInfo->current += 1;
×
2044
  }
2045

2046
  return pResultInfo->row;
×
2047
}
2048

2049
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
973,195✔
2050
  tsem_t* sem = param;
973,195✔
2051
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
973,195!
2052
    tscError("failed to post sem, code:%s", terrstr());
×
2053
  }
2054
}
973,191✔
2055

2056
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
20,703,398✔
2057
  if (pRequest == NULL) {
20,703,398!
2058
    return NULL;
×
2059
  }
2060

2061
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
20,703,398✔
2062
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
20,703,398✔
2063
    // All data has returned to App already, no need to try again
2064
    if (pResultInfo->completed) {
1,698,967✔
2065
      pResultInfo->numOfRows = 0;
725,784✔
2066
      return NULL;
725,784✔
2067
    }
2068

2069
    // convert ucs4 to native multi-bytes string
2070
    pResultInfo->convertUcs4 = convertUcs4;
973,183✔
2071
    tsem_t sem;
2072
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
973,183!
2073
      tscError("failed to init sem, code:%s", terrstr());
×
2074
    }
2075
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
973,180✔
2076
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
973,195!
2077
      tscError("failed to wait sem, code:%s", terrstr());
×
2078
    }
2079
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
973,184!
2080
      tscError("failed to destroy sem, code:%s", terrstr());
×
2081
    }
2082
    pRequest->inCallback = false;
973,184✔
2083
  }
2084

2085
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
19,977,615!
2086
    return NULL;
63,863✔
2087
  } else {
2088
    if (setupOneRowPtr) {
19,913,752✔
2089
      doSetOneRowPtr(pResultInfo);
19,033,594✔
2090
      pResultInfo->current += 1;
19,026,911✔
2091
    }
2092

2093
    return pResultInfo->row;
19,907,069✔
2094
  }
2095
}
2096

2097
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
1,453,572✔
2098
  if (pResInfo->row == NULL) {
1,453,572✔
2099
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,244,580!
2100
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
1,244,609!
2101
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
1,244,604!
2102
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,244,610!
2103

2104
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
1,244,619!
2105
      taosMemoryFree(pResInfo->row);
×
2106
      taosMemoryFree(pResInfo->pCol);
×
2107
      taosMemoryFree(pResInfo->length);
×
2108
      taosMemoryFree(pResInfo->convertBuf);
×
2109
      return terrno;
×
2110
    }
2111
  }
2112

2113
  return TSDB_CODE_SUCCESS;
1,453,615✔
2114
}
2115

2116
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
1,453,323✔
2117
  int32_t idx = -1;
1,453,323✔
2118
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
1,453,323✔
2119
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
1,453,377!
2120

2121
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,997,833✔
2122
    int32_t type = pResultInfo->fields[i].type;
5,544,393✔
2123
    int32_t schemaBytes =
2124
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
5,544,393✔
2125

2126
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
5,544,385✔
2127
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
249,662!
2128
      if (p == NULL) {
249,661!
2129
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2130
        return terrno;
×
2131
      }
2132

2133
      pResultInfo->convertBuf[i] = p;
249,661✔
2134

2135
      SResultColumn* pCol = &pResultInfo->pCol[i];
249,661✔
2136
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
42,889,750✔
2137
        if (pCol->offset[j] != -1) {
42,640,017✔
2138
          char* pStart = pCol->offset[j] + pCol->pData;
35,944,912✔
2139

2140
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
35,944,912✔
2141
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
35,944,992!
2142
            tscError(
8!
2143
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2144
                "colLength[i]):%p",
2145
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2146
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
8✔
2147
            return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2148
          }
2149

2150
          varDataSetLen(p, len);
35,944,984✔
2151
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
35,944,984✔
2152
          p += (len + VARSTR_HEADER_SIZE);
35,944,984✔
2153
        }
2154
      }
2155

2156
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
249,733✔
2157
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
249,733✔
2158
    }
2159
  }
2160
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
1,453,440✔
2161
  return TSDB_CODE_SUCCESS;
1,453,383✔
2162
}
2163

2164
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
1,453,372✔
2165
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,998,208✔
2166
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
5,544,836✔
2167
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
5,544,836✔
2168
    int32_t       type = pFieldE->type;
5,544,836✔
2169
    int32_t       bufLen = 0;
5,544,836✔
2170
    char*         p = NULL;
5,544,836✔
2171
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
5,544,836✔
2172
      continue;
5,536,785✔
2173
    } else {
2174
      bufLen = 64;
8,051✔
2175
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
8,051!
2176
      pFieldE->bytes = bufLen;
8,051✔
2177
      pField->bytes = bufLen;
8,051✔
2178
    }
2179
    if (!p) return terrno;
8,051!
2180
    pResultInfo->convertBuf[i] = p;
8,051✔
2181

2182
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
6,497,488✔
2183
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
6,489,437✔
2184
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
6,489,437✔
2185
      p += bufLen;
6,489,437✔
2186
      if (TSDB_CODE_SUCCESS != code) {
6,489,437!
2187
        return code;
×
2188
      }
2189
    }
2190
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
8,051✔
2191
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
8,051✔
2192
  }
2193
  return 0;
1,453,372✔
2194
}
2195

2196
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
1,344✔
2197
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
1,344✔
2198
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
2199
}
2200

2201
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
672✔
2202
  char*   p = (char*)pResultInfo->pData;
672✔
2203
  int32_t blockVersion = *(int32_t*)p;
672✔
2204

2205
  int32_t numOfRows = pResultInfo->numOfRows;
672✔
2206
  int32_t numOfCols = pResultInfo->numOfCols;
672✔
2207

2208
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2209
  // length |
2210
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
672✔
2211
  if (numOfCols != cols) {
672!
2212
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2213
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2214
  }
2215

2216
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
672✔
2217
  int32_t* colLength = (int32_t*)(p + len);
672✔
2218
  len += sizeof(int32_t) * numOfCols;
672✔
2219

2220
  char* pStart = p + len;
672✔
2221
  for (int32_t i = 0; i < numOfCols; ++i) {
3,116✔
2222
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,444!
2223

2224
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,444✔
2225
      int32_t* offset = (int32_t*)pStart;
822✔
2226
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
822✔
2227
      len += lenTmp;
822✔
2228
      pStart += lenTmp;
822✔
2229

2230
      int32_t estimateColLen = 0;
822✔
2231
      for (int32_t j = 0; j < numOfRows; ++j) {
4,991✔
2232
        if (offset[j] == -1) {
4,169✔
2233
          continue;
255✔
2234
        }
2235
        char* data = offset[j] + pStart;
3,914✔
2236

2237
        int32_t jsonInnerType = *data;
3,914✔
2238
        char*   jsonInnerData = data + CHAR_BYTES;
3,914✔
2239
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
3,914✔
2240
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
72✔
2241
        } else if (tTagIsJson(data)) {
3,842✔
2242
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
844✔
2243
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
2,998✔
2244
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
2,702✔
2245
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2246
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
216✔
2247
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2248
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
80✔
2249
        } else {
2250
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2251
          return -1;
×
2252
        }
2253
      }
2254
      len += TMAX(colLen, estimateColLen);
822✔
2255
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,622!
2256
      int32_t lenTmp = numOfRows * sizeof(int32_t);
396✔
2257
      len += (lenTmp + colLen);
396✔
2258
      pStart += lenTmp;
396✔
2259
    } else {
2260
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
1,226✔
2261
      len += (lenTmp + colLen);
1,226✔
2262
      pStart += lenTmp;
1,226✔
2263
    }
2264
    pStart += colLen;
2,444✔
2265
  }
2266

2267
  // Ensure the complete structure of the block, including the blankfill field,
2268
  // even though it is not used on the client side.
2269
  len += sizeof(bool);
672✔
2270
  return len;
672✔
2271
}
2272

2273
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
1,453,606✔
2274
  int32_t numOfRows = pResultInfo->numOfRows;
1,453,606✔
2275
  int32_t numOfCols = pResultInfo->numOfCols;
1,453,606✔
2276
  bool    needConvert = false;
1,453,606✔
2277
  for (int32_t i = 0; i < numOfCols; ++i) {
6,998,661✔
2278
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
5,545,727✔
2279
      needConvert = true;
672✔
2280
      break;
672✔
2281
    }
2282
  }
2283

2284
  if (!needConvert) {
1,453,606✔
2285
    return TSDB_CODE_SUCCESS;
1,452,936✔
2286
  }
2287

2288
  tscDebug("start to convert form json format string");
670✔
2289

2290
  char*   p = (char*)pResultInfo->pData;
670✔
2291
  int32_t blockVersion = *(int32_t*)p;
670✔
2292
  int32_t dataLen = estimateJsonLen(pResultInfo);
670✔
2293
  if (dataLen <= 0) {
672!
2294
    tscError("doConvertJson error: estimateJsonLen failed");
×
2295
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2296
  }
2297

2298
  taosMemoryFreeClear(pResultInfo->convertJson);
672!
2299
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
672!
2300
  if (pResultInfo->convertJson == NULL) return terrno;
672!
2301
  char* p1 = pResultInfo->convertJson;
672✔
2302

2303
  int32_t totalLen = 0;
672✔
2304
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
672✔
2305
  if (numOfCols != cols) {
672!
2306
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2307
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2308
  }
2309

2310
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
672✔
2311
  (void)memcpy(p1, p, len);
672✔
2312

2313
  p += len;
672✔
2314
  p1 += len;
672✔
2315
  totalLen += len;
672✔
2316

2317
  len = sizeof(int32_t) * numOfCols;
672✔
2318
  int32_t* colLength = (int32_t*)p;
672✔
2319
  int32_t* colLength1 = (int32_t*)p1;
672✔
2320
  (void)memcpy(p1, p, len);
672✔
2321
  p += len;
672✔
2322
  p1 += len;
672✔
2323
  totalLen += len;
672✔
2324

2325
  char* pStart = p;
672✔
2326
  char* pStart1 = p1;
672✔
2327
  for (int32_t i = 0; i < numOfCols; ++i) {
3,116✔
2328
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,444!
2329
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
2,444!
2330
    if (colLen >= dataLen) {
2,444!
2331
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2332
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2333
    }
2334
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,444✔
2335
      int32_t* offset = (int32_t*)pStart;
822✔
2336
      int32_t* offset1 = (int32_t*)pStart1;
822✔
2337
      len = numOfRows * sizeof(int32_t);
822✔
2338
      (void)memcpy(pStart1, pStart, len);
822✔
2339
      pStart += len;
822✔
2340
      pStart1 += len;
822✔
2341
      totalLen += len;
822✔
2342

2343
      len = 0;
822✔
2344
      for (int32_t j = 0; j < numOfRows; ++j) {
4,991✔
2345
        if (offset[j] == -1) {
4,169✔
2346
          continue;
255✔
2347
        }
2348
        char* data = offset[j] + pStart;
3,914✔
2349

2350
        int32_t jsonInnerType = *data;
3,914✔
2351
        char*   jsonInnerData = data + CHAR_BYTES;
3,914✔
2352
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
3,914✔
2353
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
3,914✔
2354
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
72✔
2355
          varDataSetLen(dst, strlen(varDataVal(dst)));
72✔
2356
        } else if (tTagIsJson(data)) {
3,842✔
2357
          char* jsonString = NULL;
844✔
2358
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
844✔
2359
          if (jsonString == NULL) {
844!
2360
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2361
            return terrno;
×
2362
          }
2363
          STR_TO_VARSTR(dst, jsonString);
844✔
2364
          taosMemoryFree(jsonString);
844!
2365
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
2,998✔
2366
          *(char*)varDataVal(dst) = '\"';
2,702✔
2367
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
2,702✔
2368
                                         varDataVal(dst) + CHAR_BYTES, pResultInfo->charsetCxt);
2369
          if (length <= 0) {
2,702✔
2370
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
4!
2371
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2372
            length = 0;
4✔
2373
          }
2374
          varDataSetLen(dst, length + CHAR_BYTES * 2);
2,702✔
2375
          *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
2,702✔
2376
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2377
          double jsonVd = *(double*)(jsonInnerData);
216✔
2378
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
216✔
2379
          varDataSetLen(dst, strlen(varDataVal(dst)));
216✔
2380
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2381
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
80✔
2382
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
80✔
2383
          varDataSetLen(dst, strlen(varDataVal(dst)));
80✔
2384
        } else {
2385
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2386
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2387
        }
2388

2389
        offset1[j] = len;
3,914✔
2390
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
3,914✔
2391
        len += varDataTLen(dst);
3,914✔
2392
      }
2393
      colLen1 = len;
822✔
2394
      totalLen += colLen1;
822✔
2395
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
822!
2396
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,622!
2397
      len = numOfRows * sizeof(int32_t);
396✔
2398
      (void)memcpy(pStart1, pStart, len);
396✔
2399
      pStart += len;
396✔
2400
      pStart1 += len;
396✔
2401
      totalLen += len;
396✔
2402
      totalLen += colLen;
396✔
2403
      (void)memcpy(pStart1, pStart, colLen);
396✔
2404
    } else {
2405
      len = BitmapLen(pResultInfo->numOfRows);
1,226✔
2406
      (void)memcpy(pStart1, pStart, len);
1,226✔
2407
      pStart += len;
1,226✔
2408
      pStart1 += len;
1,226✔
2409
      totalLen += len;
1,226✔
2410
      totalLen += colLen;
1,226✔
2411
      (void)memcpy(pStart1, pStart, colLen);
1,226✔
2412
    }
2413
    pStart += colLen;
2,444✔
2414
    pStart1 += colLen1;
2,444✔
2415
  }
2416

2417
  // Ensure the complete structure of the block, including the blankfill field,
2418
  // even though it is not used on the client side.
2419
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2420
  totalLen += sizeof(bool);
672✔
2421

2422
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
672✔
2423
  pResultInfo->pData = pResultInfo->convertJson;
672✔
2424
  return TSDB_CODE_SUCCESS;
672✔
2425
}
2426

2427
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
1,523,703✔
2428
  bool convertForDecimal = convertUcs4;
1,523,703✔
2429
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
1,523,703!
2430
    tscError("setResultDataPtr paras error");
×
2431
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2432
  }
2433

2434
  if (pResultInfo->numOfRows == 0) {
1,523,714✔
2435
    return TSDB_CODE_SUCCESS;
70,134✔
2436
  }
2437

2438
  if (pResultInfo->pData == NULL) {
1,453,580!
2439
    tscError("setResultDataPtr error: pData is NULL");
×
2440
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2441
  }
2442

2443
  int32_t code = doPrepareResPtr(pResultInfo);
1,453,580✔
2444
  if (code != TSDB_CODE_SUCCESS) {
1,453,616!
2445
    return code;
×
2446
  }
2447
  code = doConvertJson(pResultInfo);
1,453,616✔
2448
  if (code != TSDB_CODE_SUCCESS) {
1,453,606!
2449
    return code;
×
2450
  }
2451

2452
  char* p = (char*)pResultInfo->pData;
1,453,606✔
2453

2454
  // version:
2455
  int32_t blockVersion = *(int32_t*)p;
1,453,606✔
2456
  p += sizeof(int32_t);
1,453,606✔
2457

2458
  int32_t dataLen = *(int32_t*)p;
1,453,606✔
2459
  p += sizeof(int32_t);
1,453,606✔
2460

2461
  int32_t rows = *(int32_t*)p;
1,453,606✔
2462
  p += sizeof(int32_t);
1,453,606✔
2463

2464
  int32_t cols = *(int32_t*)p;
1,453,606✔
2465
  p += sizeof(int32_t);
1,453,606✔
2466

2467
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
1,453,606!
2468
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
×
2469
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2470
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2471
  }
2472

2473
  int32_t hasColumnSeg = *(int32_t*)p;
1,453,606✔
2474
  p += sizeof(int32_t);
1,453,606✔
2475

2476
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
1,453,606✔
2477
  p += sizeof(uint64_t);
1,453,606✔
2478

2479
  // check fields
2480
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,999,280✔
2481
    int8_t type = *(int8_t*)p;
5,545,695✔
2482
    p += sizeof(int8_t);
5,545,695✔
2483

2484
    int32_t bytes = *(int32_t*)p;
5,545,695✔
2485
    p += sizeof(int32_t);
5,545,695✔
2486

2487
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
5,545,695!
2488
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
×
2489
    }
2490
  }
2491

2492
  int32_t* colLength = (int32_t*)p;
1,453,585✔
2493
  p += sizeof(int32_t) * pResultInfo->numOfCols;
1,453,585✔
2494

2495
  char* pStart = p;
1,453,585✔
2496
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,999,040✔
2497
    if ((pStart - pResultInfo->pData) >= dataLen) {
5,545,478!
2498
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2499
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2500
    }
2501
    if (blockVersion == BLOCK_VERSION_1) {
5,545,478✔
2502
      colLength[i] = htonl(colLength[i]);
3,701,496✔
2503
    }
2504
    if (colLength[i] >= dataLen) {
5,545,478!
2505
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2506
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2507
    }
2508
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
5,545,478!
2509
      tscError("invalid type %d", pResultInfo->fields[i].type);
3!
2510
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2511
    }
2512
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
5,545,475✔
2513
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
1,357,511✔
2514
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
1,357,511✔
2515
    } else {
2516
      pResultInfo->pCol[i].nullbitmap = pStart;
4,187,964✔
2517
      pStart += BitmapLen(pResultInfo->numOfRows);
4,187,964✔
2518
    }
2519

2520
    pResultInfo->pCol[i].pData = pStart;
5,545,475✔
2521
    pResultInfo->length[i] =
11,090,930✔
2522
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
5,545,475✔
2523
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
5,545,455✔
2524

2525
    pStart += colLength[i];
5,545,455✔
2526
  }
2527

2528
  p = pStart;
1,453,562✔
2529
  // bool blankFill = *(bool*)p;
2530
  p += sizeof(bool);
1,453,562✔
2531
  int32_t offset = p - pResultInfo->pData;
1,453,562✔
2532
  if (offset > dataLen) {
1,453,562!
2533
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2534
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2535
  }
2536

2537
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2538
  if (convertUcs4) {
1,453,562✔
2539
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
1,453,318✔
2540
  }
2541
#endif
2542
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
1,453,627✔
2543
    code = convertDecimalType(pResultInfo);
1,453,384✔
2544
  }
2545
  return code;
1,453,586✔
2546
}
2547

2548
char* getDbOfConnection(STscObj* pObj) {
11,703,665✔
2549
  terrno = TSDB_CODE_SUCCESS;
11,703,665✔
2550
  char* p = NULL;
11,701,256✔
2551
  (void)taosThreadMutexLock(&pObj->mutex);
11,701,256✔
2552
  size_t len = strlen(pObj->db);
11,711,873✔
2553
  if (len > 0) {
11,711,873✔
2554
    p = taosStrndup(pObj->db, tListLen(pObj->db));
11,164,868!
2555
    if (p == NULL) {
11,156,554!
2556
      tscError("failed to taosStrndup db name");
×
2557
    }
2558
  }
2559

2560
  (void)taosThreadMutexUnlock(&pObj->mutex);
11,703,559✔
2561
  return p;
11,713,751✔
2562
}
2563

2564
void setConnectionDB(STscObj* pTscObj, const char* db) {
10,167✔
2565
  if (db == NULL || pTscObj == NULL) {
10,167!
2566
    tscError("setConnectionDB para is NULL");
×
2567
    return;
×
2568
  }
2569

2570
  (void)taosThreadMutexLock(&pTscObj->mutex);
10,172✔
2571
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
10,169✔
2572
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
10,169✔
2573
}
2574

2575
void resetConnectDB(STscObj* pTscObj) {
×
2576
  if (pTscObj == NULL) {
×
2577
    return;
×
2578
  }
2579

2580
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2581
  pTscObj->db[0] = 0;
×
2582
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2583
}
2584

2585
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
1,187,379✔
2586
                              bool isStmt) {
2587
  if (pResultInfo == NULL || pRsp == NULL) {
1,187,379!
2588
    tscError("setQueryResultFromRsp paras is null");
×
2589
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2590
  }
2591

2592
  taosMemoryFreeClear(pResultInfo->pRspMsg);
1,187,395!
2593
  pResultInfo->pRspMsg = (const char*)pRsp;
1,187,395✔
2594
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
1,187,395✔
2595
  pResultInfo->current = 0;
1,187,387✔
2596
  pResultInfo->completed = (pRsp->completed == 1);
1,187,387✔
2597
  pResultInfo->precision = pRsp->precision;
1,187,387✔
2598

2599
  // decompress data if needed
2600
  int32_t payloadLen = htonl(pRsp->payloadLen);
1,187,387✔
2601

2602
  if (pRsp->compressed) {
1,187,387✔
2603
    if (pResultInfo->decompBuf == NULL) {
790✔
2604
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
22!
2605
      if (pResultInfo->decompBuf == NULL) {
22!
2606
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2607
        return terrno;
×
2608
      }
2609
      pResultInfo->decompBufSize = payloadLen;
22✔
2610
    } else {
2611
      if (pResultInfo->decompBufSize < payloadLen) {
768✔
2612
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
30!
2613
        if (p == NULL) {
30!
2614
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2615
          return terrno;
×
2616
        }
2617

2618
        pResultInfo->decompBuf = p;
30✔
2619
        pResultInfo->decompBufSize = payloadLen;
30✔
2620
      }
2621
    }
2622
  }
2623

2624
  if (payloadLen > 0) {
1,187,387✔
2625
    int32_t compLen = *(int32_t*)pRsp->data;
1,117,251✔
2626
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
1,117,251✔
2627

2628
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
1,117,251✔
2629

2630
    if (pRsp->compressed && compLen < rawLen) {
1,117,251!
2631
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
790✔
2632
      if (len < 0) {
790!
2633
        tscError("tsDecompressString failed");
×
2634
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2635
      }
2636
      if (len != rawLen) {
790!
2637
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2638
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2639
      }
2640
      pResultInfo->pData = pResultInfo->decompBuf;
790✔
2641
      pResultInfo->payloadLen = rawLen;
790✔
2642
    } else {
2643
      pResultInfo->pData = pStart;
1,116,461✔
2644
      pResultInfo->payloadLen = htonl(pRsp->compLen);
1,116,461✔
2645
      if (pRsp->compLen != pRsp->payloadLen) {
1,116,461!
2646
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2647
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2648
      }
2649
    }
2650
  }
2651

2652
  // TODO handle the compressed case
2653
  pResultInfo->totalRows += pResultInfo->numOfRows;
1,187,387✔
2654

2655
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
1,187,387✔
2656
  return code;
1,187,392✔
2657
}
2658

2659
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
5✔
2660
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
5✔
2661
  void*              clientRpc = NULL;
5✔
2662
  SServerStatusRsp   statusRsp = {0};
5✔
2663
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
5✔
2664
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
5✔
2665
  SRpcMsg  rpcRsp = {0};
5✔
2666
  SRpcInit rpcInit = {0};
5✔
2667
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
5✔
2668

2669
  rpcInit.label = "CHK";
5✔
2670
  rpcInit.numOfThreads = 1;
5✔
2671
  rpcInit.cfp = NULL;
5✔
2672
  rpcInit.sessions = 16;
5✔
2673
  rpcInit.connType = TAOS_CONN_CLIENT;
5✔
2674
  rpcInit.idleTime = tsShellActivityTimer * 1000;
5✔
2675
  rpcInit.compressSize = tsCompressMsgSize;
5✔
2676
  rpcInit.user = "_dnd";
5✔
2677

2678
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
5✔
2679
  connLimitNum = TMAX(connLimitNum, 10);
5✔
2680
  connLimitNum = TMIN(connLimitNum, 500);
5✔
2681
  rpcInit.connLimitNum = connLimitNum;
5✔
2682
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
5✔
2683
  rpcInit.readTimeout = tsReadTimeout;
5✔
2684
  rpcInit.ipv6 = tsEnableIpv6;
5✔
2685
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
5!
2686
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2687
    goto _OVER;
×
2688
  }
2689

2690
  clientRpc = rpcOpen(&rpcInit);
5✔
2691
  if (clientRpc == NULL) {
5!
2692
    code = terrno;
×
2693
    tscError("failed to init server status client since %s", tstrerror(code));
×
2694
    goto _OVER;
×
2695
  }
2696

2697
  if (fqdn == NULL) {
5!
2698
    fqdn = tsLocalFqdn;
5✔
2699
  }
2700

2701
  if (port == 0) {
5!
2702
    port = tsServerPort;
5✔
2703
  }
2704

2705
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
5✔
2706
  epSet.eps[0].port = (uint16_t)port;
5✔
2707
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
5✔
2708
  if (TSDB_CODE_SUCCESS != ret) {
5!
2709
    tscError("failed to send recv since %s", tstrerror(ret));
×
2710
    goto _OVER;
×
2711
  }
2712

2713
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
5!
2714
    tscError("failed to send server status req since %s", terrstr());
1!
2715
    goto _OVER;
1✔
2716
  }
2717

2718
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
4!
2719
    tscError("failed to parse server status rsp since %s", terrstr());
×
2720
    goto _OVER;
×
2721
  }
2722

2723
  code = statusRsp.statusCode;
4✔
2724
  if (details != NULL) {
4!
2725
    tstrncpy(details, statusRsp.details, maxlen);
4✔
2726
  }
2727

2728
_OVER:
×
2729
  if (clientRpc != NULL) {
5!
2730
    rpcClose(clientRpc);
5✔
2731
  }
2732
  if (rpcRsp.pCont != NULL) {
5✔
2733
    rpcFreeCont(rpcRsp.pCont);
4✔
2734
  }
2735
  return code;
5✔
2736
}
2737

2738
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
4✔
2739
                      int32_t acctId, char* db) {
2740
  SName name = {0};
4✔
2741

2742
  if (len1 <= 0) {
4!
2743
    return -1;
×
2744
  }
2745

2746
  const char* dbName = db;
4✔
2747
  const char* tbName = NULL;
4✔
2748
  int32_t     dbLen = 0;
4✔
2749
  int32_t     tbLen = 0;
4✔
2750
  if (len2 > 0) {
4!
2751
    dbName = str + pos1;
×
2752
    dbLen = len1;
×
2753
    tbName = str + pos2;
×
2754
    tbLen = len2;
×
2755
  } else {
2756
    dbLen = strlen(db);
4✔
2757
    tbName = str + pos1;
4✔
2758
    tbLen = len1;
4✔
2759
  }
2760

2761
  if (dbLen <= 0 || tbLen <= 0) {
4!
2762
    return -1;
×
2763
  }
2764

2765
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
4!
2766
    return -1;
×
2767
  }
2768

2769
  if (tNameAddTbName(&name, tbName, tbLen)) {
4!
2770
    return -1;
×
2771
  }
2772

2773
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
4✔
2774
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
4✔
2775

2776
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
4✔
2777
  if (pDb) {
4!
2778
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2779
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2780
    }
2781
  } else {
2782
    STablesReq db;
2783
    db.pTables = taosArrayInit(20, sizeof(SName));
4✔
2784
    if (NULL == db.pTables) {
4!
2785
      return terrno;
×
2786
    }
2787
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
4✔
2788
    if (NULL == taosArrayPush(db.pTables, &name)) {
8!
2789
      return terrno;
×
2790
    }
2791
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
4!
2792
  }
2793

2794
  return TSDB_CODE_SUCCESS;
4✔
2795
}
2796

2797
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
4✔
2798
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
4✔
2799
  if (NULL == pHash) {
4!
2800
    return terrno;
×
2801
  }
2802

2803
  bool    inEscape = false;
4✔
2804
  int32_t code = 0;
4✔
2805
  void*   pIter = NULL;
4✔
2806

2807
  int32_t vIdx = 0;
4✔
2808
  int32_t vPos[2];
2809
  int32_t vLen[2];
2810

2811
  (void)memset(vPos, -1, sizeof(vPos));
4✔
2812
  (void)memset(vLen, 0, sizeof(vLen));
4✔
2813

2814
  for (int32_t i = 0;; ++i) {
18✔
2815
    if (0 == *(tbList + i)) {
18✔
2816
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
4!
2817
        vLen[vIdx] = i - vPos[vIdx];
4✔
2818
      }
2819

2820
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
4✔
2821
      if (code) {
4!
2822
        goto _return;
×
2823
      }
2824

2825
      break;
4✔
2826
    }
2827

2828
    if ('`' == *(tbList + i)) {
14!
2829
      inEscape = !inEscape;
×
2830
      if (!inEscape) {
×
2831
        if (vPos[vIdx] >= 0) {
×
2832
          vLen[vIdx] = i - vPos[vIdx];
×
2833
        } else {
2834
          goto _return;
×
2835
        }
2836
      }
2837

2838
      continue;
×
2839
    }
2840

2841
    if (inEscape) {
14!
2842
      if (vPos[vIdx] < 0) {
×
2843
        vPos[vIdx] = i;
×
2844
      }
2845
      continue;
×
2846
    }
2847

2848
    if ('.' == *(tbList + i)) {
14!
2849
      if (vPos[vIdx] < 0) {
×
2850
        goto _return;
×
2851
      }
2852
      if (vLen[vIdx] <= 0) {
×
2853
        vLen[vIdx] = i - vPos[vIdx];
×
2854
      }
2855
      vIdx++;
×
2856
      if (vIdx >= 2) {
×
2857
        goto _return;
×
2858
      }
2859
      continue;
×
2860
    }
2861

2862
    if (',' == *(tbList + i)) {
14!
2863
      if (vPos[vIdx] < 0) {
×
2864
        goto _return;
×
2865
      }
2866
      if (vLen[vIdx] <= 0) {
×
2867
        vLen[vIdx] = i - vPos[vIdx];
×
2868
      }
2869

2870
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
2871
      if (code) {
×
2872
        goto _return;
×
2873
      }
2874

2875
      (void)memset(vPos, -1, sizeof(vPos));
×
2876
      (void)memset(vLen, 0, sizeof(vLen));
×
2877
      vIdx = 0;
×
2878
      continue;
×
2879
    }
2880

2881
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
14!
2882
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
2883
        vLen[vIdx] = i - vPos[vIdx];
×
2884
      }
2885
      continue;
×
2886
    }
2887

2888
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
14!
2889
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
1!
2890
      if (vLen[vIdx] > 0) {
14!
2891
        goto _return;
×
2892
      }
2893
      if (vPos[vIdx] < 0) {
14✔
2894
        vPos[vIdx] = i;
4✔
2895
      }
2896
      continue;
14✔
2897
    }
2898

2899
    goto _return;
×
2900
  }
2901

2902
  int32_t dbNum = taosHashGetSize(pHash);
4✔
2903
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
4✔
2904
  if (NULL == pReq) {
4!
2905
    TSC_ERR_JRET(terrno);
×
2906
  }
2907
  pIter = taosHashIterate(pHash, NULL);
4✔
2908
  while (pIter) {
8✔
2909
    STablesReq* pDb = (STablesReq*)pIter;
4✔
2910
    if (NULL == taosArrayPush(*pReq, pDb)) {
8!
2911
      TSC_ERR_JRET(terrno);
×
2912
    }
2913
    pIter = taosHashIterate(pHash, pIter);
4✔
2914
  }
2915

2916
  taosHashCleanup(pHash);
4✔
2917

2918
  return TSDB_CODE_SUCCESS;
4✔
2919

2920
_return:
×
2921

2922
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
2923

2924
  pIter = taosHashIterate(pHash, NULL);
×
2925
  while (pIter) {
×
2926
    STablesReq* pDb = (STablesReq*)pIter;
×
2927
    taosArrayDestroy(pDb->pTables);
×
2928
    pIter = taosHashIterate(pHash, pIter);
×
2929
  }
2930

2931
  taosHashCleanup(pHash);
×
2932

2933
  return terrno;
×
2934
}
2935

2936
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
4✔
2937
  SSyncQueryParam* pParam = param;
4✔
2938
  pParam->pRequest->code = code;
4✔
2939

2940
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
4!
2941
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2942
  }
2943
}
4✔
2944

2945
void syncQueryFn(void* param, void* res, int32_t code) {
11,593,571✔
2946
  SSyncQueryParam* pParam = param;
11,593,571✔
2947
  pParam->pRequest = res;
11,593,571✔
2948

2949
  if (pParam->pRequest) {
11,593,571✔
2950
    pParam->pRequest->code = code;
11,591,470✔
2951
    clientOperateReport(pParam->pRequest);
11,591,470✔
2952
  }
2953

2954
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
11,594,754!
2955
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2956
  }
2957
}
11,598,937✔
2958

2959
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
11,584,829✔
2960
                        int8_t source) {
2961
  if (sql == NULL || NULL == fp) {
11,584,829!
2962
    terrno = TSDB_CODE_INVALID_PARA;
×
2963
    if (fp) {
×
2964
      fp(param, NULL, terrno);
×
2965
    }
2966

2967
    return;
1✔
2968
  }
2969

2970
  size_t sqlLen = strlen(sql);
11,594,440✔
2971
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
11,594,440✔
2972
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, TSDB_MAX_ALLOWED_SQL_LEN);
1!
2973
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1✔
2974
    fp(param, NULL, terrno);
1✔
2975
    return;
1✔
2976
  }
2977

2978
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
11,594,439✔
2979

2980
  SRequestObj* pRequest = NULL;
11,594,439✔
2981
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
11,594,439✔
2982
  if (code != TSDB_CODE_SUCCESS) {
11,590,382!
2983
    terrno = code;
×
2984
    fp(param, NULL, terrno);
×
2985
    return;
×
2986
  }
2987

2988
  pRequest->source = source;
11,590,382✔
2989
  pRequest->body.queryFp = fp;
11,590,382✔
2990
  doAsyncQuery(pRequest, false);
11,590,382✔
2991
}
2992

2993
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
×
2994
                                 int64_t reqid) {
2995
  if (sql == NULL || NULL == fp) {
×
2996
    terrno = TSDB_CODE_INVALID_PARA;
×
2997
    if (fp) {
×
2998
      fp(param, NULL, terrno);
×
2999
    }
3000

3001
    return;
×
3002
  }
3003

3004
  size_t sqlLen = strlen(sql);
×
3005
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
×
3006
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid,
×
3007
             TSDB_MAX_ALLOWED_SQL_LEN);
3008
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3009
    fp(param, NULL, terrno);
×
3010
    return;
×
3011
  }
3012

3013
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
×
3014

3015
  SRequestObj* pRequest = NULL;
×
3016
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
×
3017
  if (code != TSDB_CODE_SUCCESS) {
×
3018
    terrno = code;
×
3019
    fp(param, NULL, terrno);
×
3020
    return;
×
3021
  }
3022

3023
  pRequest->body.queryFp = fp;
×
3024
  doAsyncQuery(pRequest, false);
×
3025
}
3026

3027
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
11,582,183✔
3028
  if (NULL == taos) {
11,582,183!
3029
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3030
    return NULL;
×
3031
  }
3032

3033
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
11,582,183!
3034
  if (NULL == param) {
11,593,821!
3035
    return NULL;
×
3036
  }
3037
  int32_t code = tsem_init(&param->sem, 0, 0);
11,593,821✔
3038
  if (TSDB_CODE_SUCCESS != code) {
11,591,780!
3039
    taosMemoryFree(param);
×
3040
    return NULL;
×
3041
  }
3042

3043
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
11,591,780✔
3044
  code = tsem_wait(&param->sem);
11,543,329✔
3045
  if (TSDB_CODE_SUCCESS != code) {
11,596,014!
3046
    taosMemoryFree(param);
×
3047
    return NULL;
×
3048
  }
3049
  code = tsem_destroy(&param->sem);
11,596,014✔
3050
  if (TSDB_CODE_SUCCESS != code) {
11,592,754!
3051
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3052
  }
3053

3054
  SRequestObj* pRequest = NULL;
11,592,147✔
3055
  if (param->pRequest != NULL) {
11,592,147✔
3056
    param->pRequest->syncQuery = true;
11,592,146✔
3057
    pRequest = param->pRequest;
11,592,146✔
3058
    param->pRequest->inCallback = false;
11,592,146✔
3059
  }
3060
  taosMemoryFree(param);
11,592,147!
3061

3062
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3063
  //          *(int64_t*)taos, pRequest);
3064

3065
  return pRequest;
11,589,360✔
3066
}
3067

3068
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
×
3069
  if (NULL == taos) {
×
3070
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3071
    return NULL;
×
3072
  }
3073

3074
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
×
3075
  if (param == NULL) {
×
3076
    return NULL;
×
3077
  }
3078
  int32_t code = tsem_init(&param->sem, 0, 0);
×
3079
  if (TSDB_CODE_SUCCESS != code) {
×
3080
    taosMemoryFree(param);
×
3081
    return NULL;
×
3082
  }
3083

3084
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
×
3085
  code = tsem_wait(&param->sem);
×
3086
  if (TSDB_CODE_SUCCESS != code) {
×
3087
    taosMemoryFree(param);
×
3088
    return NULL;
×
3089
  }
3090
  SRequestObj* pRequest = NULL;
×
3091
  if (param->pRequest != NULL) {
×
3092
    param->pRequest->syncQuery = true;
×
3093
    pRequest = param->pRequest;
×
3094
  }
3095
  taosMemoryFree(param);
×
3096

3097
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3098
  //   *(int64_t*)taos, pRequest);
3099

3100
  return pRequest;
×
3101
}
3102

3103
static void fetchCallback(void* pResult, void* param, int32_t code) {
1,068,633✔
3104
  SRequestObj* pRequest = (SRequestObj*)param;
1,068,633✔
3105

3106
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,068,633✔
3107

3108
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
1,068,633✔
3109
           tstrerror(code), pRequest->requestId);
3110

3111
  pResultInfo->pData = pResult;
1,068,621✔
3112
  pResultInfo->numOfRows = 0;
1,068,621✔
3113

3114
  if (code != TSDB_CODE_SUCCESS) {
1,068,621!
3115
    pRequest->code = code;
×
3116
    taosMemoryFreeClear(pResultInfo->pData);
×
3117
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3118
    return;
×
3119
  }
3120

3121
  if (pRequest->code != TSDB_CODE_SUCCESS) {
1,068,621!
3122
    taosMemoryFreeClear(pResultInfo->pData);
×
3123
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3124
    return;
×
3125
  }
3126

3127
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
2,137,255✔
3128
                                         pResultInfo->convertUcs4, pRequest->isStmtBind);
1,068,621✔
3129
  if (pRequest->code != TSDB_CODE_SUCCESS) {
1,068,634!
3130
    pResultInfo->numOfRows = 0;
×
3131
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
×
3132
             tstrerror(pRequest->code), pRequest->requestId);
3133
  } else {
3134
    tscDebug(
1,068,634✔
3135
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3136
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3137

3138
    STscObj*            pTscObj = pRequest->pTscObj;
1,068,636✔
3139
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,068,636✔
3140
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
1,068,636✔
3141
  }
3142

3143
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
1,068,655✔
3144
}
3145

3146
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
1,154,206✔
3147
  pRequest->body.fetchFp = fp;
1,154,206✔
3148
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
1,154,206✔
3149

3150
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,154,206✔
3151

3152
  // this query has no results or error exists, return directly
3153
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,154,206!
3154
    pResultInfo->numOfRows = 0;
×
3155
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3156
    return;
85,552✔
3157
  }
3158

3159
  // all data has returned to App already, no need to try again
3160
  if (pResultInfo->completed) {
1,154,207✔
3161
    // it is a local executed query, no need to do async fetch
3162
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
85,551✔
3163
      if (pResultInfo->localResultFetched) {
2,732✔
3164
        pResultInfo->numOfRows = 0;
1,366✔
3165
        pResultInfo->current = 0;
1,366✔
3166
      } else {
3167
        pResultInfo->localResultFetched = true;
1,366✔
3168
      }
3169
    } else {
3170
      pResultInfo->numOfRows = 0;
82,819✔
3171
    }
3172

3173
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
85,551✔
3174
    return;
85,551✔
3175
  }
3176

3177
  SSchedulerReq req = {
1,068,656✔
3178
      .syncReq = false,
3179
      .fetchFp = fetchCallback,
3180
      .cbParam = pRequest,
3181
  };
3182

3183
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
1,068,656✔
3184
  if (TSDB_CODE_SUCCESS != code) {
1,068,644!
3185
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3186
    // pRequest->body.fetchFp(param, pRequest, code);
3187
  }
3188
}
3189

3190
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
11,592,104✔
3191
  pRequest->inCallback = true;
11,592,104✔
3192
  int64_t this = pRequest->self;
11,592,104✔
3193
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
11,592,104!
3194
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
316!
3195
    code = TSDB_CODE_SUCCESS;
×
3196
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3197
  }
3198

3199
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
11,592,104✔
3200
           pRequest);
3201

3202
  if (pRequest->body.queryFp != NULL) {
11,592,104!
3203
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
11,592,463✔
3204
  }
3205

3206
  SRequestObj* pReq = acquireRequest(this);
11,597,744✔
3207
  if (pReq != NULL) {
11,598,079✔
3208
    pReq->inCallback = false;
11,596,188✔
3209
    (void)releaseRequest(this);
11,596,188✔
3210
  }
3211
}
11,594,891✔
3212

3213
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
1,828✔
3214
                       SParseSqlRes* pRes) {
3215
#ifndef TD_ENTERPRISE
3216
  return TSDB_CODE_SUCCESS;
3217
#else
3218
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
1,828✔
3219
#endif
3220
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc